Source code for ensightreader

# Copyright (c) 2022-2025 Tomas Karabela
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.


import io
import mmap as _mmap
import os
import os.path as op
import re
import warnings
from contextlib import contextmanager
from dataclasses import dataclass, field
from enum import Enum
from typing import BinaryIO, Dict, Generator, List, Optional, TextIO, Tuple, Type, TypeVar, Union, Iterator, Mapping, \
    Sequence, Iterable

import numpy as np
import numpy.typing as npt

T = TypeVar('T')
TNum = TypeVar('TNum', np.int32, np.float32)
SeekableBufferedReader = Union[BinaryIO, _mmap.mmap]
SeekableBufferedWriter = Union[BinaryIO, _mmap.mmap]
Float32NDArray = npt.NDArray[np.float32]
Int32NDArray = npt.NDArray[np.int32]

__version__ = "0.13.1"

__all__ = [
    "read_case",

    "EnsightReaderError",
    "EnsightReaderWarning",
    "IdHandling",
    "ChangingGeometry",
    "VariableLocation",
    "VariableType",
    "ElementType",
    "Timeset",
    "UnstructuredElementBlock",
    "GeometryPart",
    "EnsightGeometryFile",
    "EnsightVariableFile",
    "EnsightGeometryFileSet",
    "EnsightVariableFileSet",
    "EnsightConstantVariable",
    "EnsightCaseFile",
]


def add_exception_note(e: Exception, note: str) -> None:
    if hasattr(e, "add_note"):  # Python 3.11+
        e.add_note(note)


@contextmanager
def add_exception_note_block(note: str) -> Iterator[None]:
    try:
        yield
    except Exception as e:
        add_exception_note(e, note)
        raise


[docs] class EnsightReaderError(Exception): """ Error raised when parsing EnSight Gold binary files Attributes: file_path (str): path to file where the error was encountered file_offset (int): approximate seek position of the error (this may be a bit past the place where the error is - it's the seek position when this exception was raised) file_lineno (int): line number of the error (this only applies to errors in ``*.case`` file, as other files are binary) """ def __init__(self, msg: str, fp: Optional[Union[TextIO, SeekableBufferedReader]] = None, lineno: Optional[int] = None): self.file_path = getattr(fp, "name", None) try: self.file_offset = fp.tell() if fp else None except OSError: self.file_offset = None self.file_lineno = lineno if lineno is not None: message = f"{msg} (path={self.file_path}, line={self.file_lineno})" else: message = f"{msg} (path={self.file_path}, offset={self.file_offset})" super(EnsightReaderError, self).__init__(message)
[docs] class EnsightReaderWarning(Warning): """ Warning raised when parsing EnSight Gold binary files Attributes: file_path (str): path to file where the error was encountered file_offset (int): approximate seek position of the error (this may be a bit past the place where the error is - it's the seek position when this exception was raised) file_lineno (int): line number of the error (this only applies to errors in ``*.case`` file, as other files are binary) """ def __init__( self, msg: str, fp: Optional[Union[TextIO, SeekableBufferedReader]] = None, lineno: Optional[int] = None, file_path: Optional[Union[str, os.PathLike[str]]] = None ): self.file_path = file_path if file_path is not None else getattr(fp, "name", None) try: self.file_offset = fp.tell() if fp else None except OSError: self.file_offset = None self.file_lineno = lineno if self.file_path is None: message = msg else: if self.file_lineno is not None: message = f"{msg} (path={self.file_path}, line={self.file_lineno})" elif self.file_offset is not None: message = f"{msg} (path={self.file_path}, offset={self.file_offset})" else: message = f"{msg} (path={self.file_path})" super(EnsightReaderWarning, self).__init__(message)
[docs] class IdHandling(Enum): """ Handling of node/element IDs in EnSight Gold geometry file. This is defined in geometry file header and describes whether IDs are present in the file or not. """ OFF = "off" GIVEN = "given" ASSIGN = "assign" IGNORE = "ignore" @property def ids_present(self) -> bool: """Return True if IDs are present in geometry file, otherwise False""" return self in (self.GIVEN, self.IGNORE) def __str__(self) -> str: return self.value
[docs] class ChangingGeometry(Enum): """ Additional information about transient geometry """ NO_CHANGE = "no_change" COORD_CHANGE = "coord_change" CONN_CHANGE = "conn_change" def __str__(self) -> str: return self.value
[docs] class VariableLocation(Enum): """ Location of variable in EnSight Gold case Whether the variable is defined for cells or nodes. """ PER_ELEMENT = "element" PER_NODE = "node" def __str__(self) -> str: return self.value
[docs] class VariableType(Enum): """ Type of variable in EnSight Gold case .. Note:: Complex variables and "per measured" variables are not supported. """ SCALAR = "scalar" VECTOR = "vector" TENSOR_SYMM = "tensor symm" TENSOR_ASYM = "tensor asym" # COMPLEX_SCALAR = "complex scalar" # COMPLEX_VECTOR = "complex vector" def can_affine_transform(self) -> bool: if self == self.SCALAR: return False elif self == self.VECTOR: return True else: warnings.warn(EnsightReaderWarning("affine_transform is only implemented for vectors, not tensors")) return False def __str__(self) -> str: return self.value
VALUES_FOR_VARIABLE_TYPE = { VariableType.SCALAR: 1, VariableType.VECTOR: 3, VariableType.TENSOR_SYMM: 6, VariableType.TENSOR_ASYM: 9, }
[docs] class ElementType(Enum): """ Element type in EnSight Gold geometry file .. Note:: Ghost cell variants ``g_*`` are not supported. """ POINT = "point" BAR2 = "bar2" BAR3 = "bar3" TRIA3 = "tria3" TRIA6 = "tria6" QUAD4 = "quad4" QUAD8 = "quad8" TETRA4 = "tetra4" TETRA10 = "tetra10" PYRAMID5 = "pyramid5" PYRAMID13 = "pyramid13" PENTA6 = "penta6" PENTA15 = "penta15" HEXA8 = "hexa8" HEXA20 = "hexa20" NSIDED = "nsided" NFACED = "nfaced" # G_POINT = "g_point" # G_BAR2 = "g_bar2" # G_BAR3 = "g_bar3" # G_TRIA3 = "g_tria3" # G_TRIA6 = "g_tria6" # G_QUAD4 = "g_quad4" # G_QUAD8 = "g_quad8" # G_TETRA4 = "g_tetra4" # G_TETRA10 = "g_tetra10" # G_PYRAMID5 = "g_pyramid5" # G_PYRAMID13 = "g_pyramid13" # G_PENTA6 = "g_penta6" # G_PENTA15 = "g_penta15" # G_HEXA8 = "g_hexa8" # G_HEXA20 = "g_hexa20" # G_NSIDED = "g_nsided" # G_NFACED = "g_nfaced" @classmethod def parse_from_line(cls, element_type_line: str) -> "ElementType": m = re.match(r"[a-z0-9_]+", element_type_line) if not m: raise ValueError(f"Unexpected element type line {element_type_line!r}") element_name = m.group(0) element_type = cls(element_name) return element_type @property def dimension(self) -> int: """ Return dimension of element Returns 3 for volume elements, 2 for surface elements, 1 for line elements and 0 for point elements. """ return DIMENSION_PER_ELEMENT[self] @property def nodes_per_element(self) -> int: """ Return number nodes defining the element This only makes sense for elements consisting of constant number of nodes. For NSIDED and NFACED element type, this raises and exception. """ return NODES_PER_ELEMENT[self]
[docs] def has_constant_number_of_nodes_per_element(self) -> bool: """ Return True if element type has constant number of nodes defining each element, else False This is True for all element types except NSIDED and NFACED. """ return self in NODES_PER_ELEMENT
def __str__(self) -> str: return self.value
NODES_PER_ELEMENT = { ElementType.POINT: 1, ElementType.BAR2: 2, ElementType.BAR3: 3, ElementType.TRIA3: 3, ElementType.TRIA6: 6, ElementType.QUAD4: 4, ElementType.QUAD8: 8, ElementType.TETRA4: 4, ElementType.TETRA10: 10, ElementType.PYRAMID5: 5, ElementType.PYRAMID13: 13, ElementType.PENTA6: 6, ElementType.PENTA15: 15, ElementType.HEXA8: 8, ElementType.HEXA20: 20, } DIMENSION_PER_ELEMENT = { ElementType.POINT: 0, ElementType.BAR2: 1, ElementType.BAR3: 1, ElementType.TRIA3: 2, ElementType.TRIA6: 2, ElementType.QUAD4: 2, ElementType.QUAD8: 2, ElementType.TETRA4: 3, ElementType.TETRA10: 3, ElementType.PYRAMID5: 3, ElementType.PYRAMID13: 3, ElementType.PENTA6: 3, ElementType.PENTA15: 3, ElementType.HEXA8: 3, ElementType.HEXA20: 3, ElementType.NSIDED: 2, ElementType.NFACED: 3, } SIZE_INT = SIZE_FLOAT = 4
[docs] class GeometryVisitor: """ Base class for a visitor that visits part geometry This class represents the visitor pattern. `GeometryVisitor.visit_part()` will be called for all parts, and you can then inspect or modify part geometry data in your overridden method. """
[docs] def is_read_only(self) -> bool: """Controls write-ability of the array parameter in `GeometryVisitor.visit_part()` (default: writeable)""" return False
[docs] def visit_part(self, coordinates_arr: Float32NDArray, part: "GeometryPart") -> None: """ Visit part coordinates data (override this method with your custom code) Args: coordinates_arr: Node coordinates given part, see `GeometryPart.read_nodes()`. part: Part info. """ return
[docs] class VariableVisitor: """ Base class for a visitor that visits variable data for parts This class represents the visitor pattern. `VariableVisitor.visit_part()` will be called for all parts (unless `VariableVisitor.visit_variable()` returns False), and you can then inspect or modify the variable data in your overridden method. """
[docs] def is_read_only(self) -> bool: """Controls write-ability of the array parameter in `VariableVisitor.visit_part()` (default: writeable)""" return False
[docs] def visit_variable(self, variable_location: VariableLocation, variable_type: VariableType, variable_name: str) -> bool: """Return False here if you do not wish to visit parts in given variable (default: True, ie. visit)""" return True
[docs] def visit_part(self, data_arr: Float32NDArray, part: "GeometryPart", variable: "EnsightVariableFile") -> None: """ Visit part variable data (override this method with your custom code) Args: data_arr: Variable data for given part, see `EnsightVariableFile.read_node_data()` and `EnsightVariableFile.read_element_data()`. For per-element variables, each element type is visited separately. part: Part info. variable: Variable file. """ return
class _AffineTransformGeometryVisitor(GeometryVisitor): """Geometry visitor implementing an affine transformation""" def __init__(self, m: Float32NDArray) -> None: self.m = m def visit_part(self, coordinates_arr: Float32NDArray, part: "GeometryPart") -> None: apply_affine_transform(coordinates_arr, self.m) class _AffineTransformVariableVisitor(VariableVisitor): """Variable visitor implementing an affine transformation""" def __init__(self, m: Float32NDArray) -> None: self.m = m def visit_variable(self, variable_location: VariableLocation, variable_type: VariableType, variable_name: str) -> bool: return variable_type.can_affine_transform() def visit_part(self, data_arr: Float32NDArray, part: "GeometryPart", variable: "EnsightVariableFile") -> None: apply_affine_transform(data_arr, self.m, translate=False)
[docs] @dataclass class Timeset: """ Description of time set in EnSight Gold case This means a non-decreasing sequence of times for which geometry and/or variable values are known. Attributes: timeset_id: ID of the time set description: label of the time set, or None number_of_steps: number of timesteps filename_numbers: list of numbers for filenames (to be filled in place of ``*`` wildcards) time_values: list of time values (ie. seconds, or something else) """ timeset_id: int description: Optional[str] number_of_steps: int filename_numbers: List[int] time_values: List[float]
[docs] def get_timestep_ids(self) -> Sequence[int]: """Return all timestep IDs""" return range(self.number_of_steps)
@staticmethod def filename_numbers_from_arithmetic_sequence(file_start_number: int, number_of_steps: int, filename_increment: int) -> List[int]: assert filename_increment >= 0 assert number_of_steps >= 0 assert file_start_number >= 0 return [file_start_number + i*filename_increment for i in range(number_of_steps)]
[docs] @dataclass class UnstructuredElementBlock: """ A block of elements of the same type in a part in EnSight Gold binary geometry file To use it: >>> from ensightreader import read_case, ElementType >>> case = read_case("example.case") >>> geofile = case.get_geometry_model() >>> part_names = geofile.get_part_names() >>> part = geofile.get_part_by_name(part_names[0]) >>> with open(geofile.file_path, "rb") as fp_geo: ... for block in part.element_blocks: ... if block.element_type == ElementType.NFACED: ... polyhedra_face_counts, face_node_counts, face_connectivity = block.read_connectivity_nfaced(fp_geo) ... elif block.element_type == ElementType.NSIDED: ... polygon_node_counts, polygon_connectivity = block.read_connectivity_nsided(fp_geo) ... else: ... connectivity = block.read_connectivity(fp_geo) Attributes: offset: offset to 'element type' line in file (eg. 'tria3') number_of_elements: number of elements in this block element_type: type of elements in this block element_id_handling: element ID presence part_id: part number """ offset: int number_of_elements: int element_type: ElementType element_id_handling: IdHandling part_id: int
[docs] def read_element_ids(self, fp: SeekableBufferedReader) -> Optional[Int32NDArray]: """ Read element IDs for this element block, if present .. note:: This method simply returns the element ID array as present in the file; it does not differentiate between ``element id given`` and ``element id ignore``, etc. Args: fp: opened geometry file object in ``"rb"`` mode Returns: 1D array of int32 with element IDs, or None if element IDs are not present in the file """ if not self.element_id_handling.ids_present: return None fp.seek(self.offset) assert read_line(fp).startswith(self.element_type.value) assert read_int(fp) == self.number_of_elements arr = read_ints(fp, self.number_of_elements) return arr
[docs] def read_connectivity(self, fp: SeekableBufferedReader) -> Int32NDArray: """ Read connectivity (for elements other than NSIDED/NFACED) Use this for elements which have constant number of nodes per element (ie. any element type except polygons and polyhedra). Args: fp: opened geometry file object in ``"rb"`` mode Returns: 2D ``(n, k)`` array of int32 with node indices (numbered from 1), where ``n`` is the number of elements and ``k`` is the number of nodes defining each element """ if self.element_type not in NODES_PER_ELEMENT: raise ValueError("Please use other methods for nsided/nfaced") fp.seek(self.offset) assert read_line(fp).startswith(self.element_type.value) assert read_int(fp) == self.number_of_elements if self.element_id_handling.ids_present: fp.seek(self.number_of_elements * SIZE_INT, os.SEEK_CUR) nodes_per_element = NODES_PER_ELEMENT[self.element_type] arr = read_ints(fp, self.number_of_elements * nodes_per_element) return arr.reshape((self.number_of_elements, nodes_per_element), order="C")
[docs] def read_connectivity_nsided(self, fp: SeekableBufferedReader) -> Tuple[Int32NDArray, Int32NDArray]: """ Read connectivity (for NSIDED elements) Args: fp: opened geometry file object in ``"rb"`` mode Returns: tuple ``(polygon_node_counts, polygon_connectivity)`` where ``polygon_node_counts`` is 1D array of type int32 giving number of nodes for each polygon and ``polygon_connectivity`` is 1D array of type int32 giving node indices (numbered from 1) for every polygon """ if self.element_type != ElementType.NSIDED: raise ValueError("Please use other methods for not nsided") fp.seek(self.offset) assert read_line(fp).startswith(self.element_type.value) assert read_int(fp) == self.number_of_elements if self.element_id_handling.ids_present: fp.seek(self.number_of_elements * SIZE_INT, os.SEEK_CUR) polygon_node_counts = read_ints(fp, self.number_of_elements) polygon_connectivity = read_ints(fp, polygon_node_counts.sum()) return polygon_node_counts, polygon_connectivity
[docs] def read_connectivity_nfaced(self, fp: SeekableBufferedReader) -> Tuple[Int32NDArray, Int32NDArray, Int32NDArray]: """ Read connectivity (for NFACED elements) Args: fp: opened geometry file object in ``"rb"`` mode Returns: tuple ``(polyhedra_face_counts, face_node_counts, face_connectivity)`` where ``polyhedra_face_counts`` is 1D array of type int32 giving number of faces for each polygon, ``face_node_counts`` is 1D array of type int32 giving number of nodes for each face for each polygon (ordered as 1st polygon 1st face, 1st polygon 2nd face, ..., 2nd polygon 1st face, ...), ``face_connectivity`` is 1D array of type int32 giving node indices (numbered from 1) """ if self.element_type != ElementType.NFACED: raise ValueError("Please use other methods for not nfaced") fp.seek(self.offset) assert read_line(fp).startswith(self.element_type.value) assert read_int(fp) == self.number_of_elements if self.element_id_handling.ids_present: fp.seek(self.number_of_elements * SIZE_INT, os.SEEK_CUR) polyhedra_face_counts = read_ints(fp, self.number_of_elements) face_node_counts = read_ints(fp, polyhedra_face_counts.sum()) face_connectivity = read_ints(fp, face_node_counts.sum()) return polyhedra_face_counts, face_node_counts, face_connectivity
[docs] @classmethod def from_file(cls, fp: SeekableBufferedReader, element_id_handling: IdHandling, part_id: int) -> "UnstructuredElementBlock": """Used internally by `GeometryPart.from_file()`""" offset = fp.tell() element_type_line = read_line(fp) try: element_type = ElementType.parse_from_line(element_type_line) except ValueError as e: raise EnsightReaderError("Unexpected element type", fp) from e with add_exception_note_block(f"element_type = {element_type}"): number_of_elements = read_int(fp) # skip element IDs if element_id_handling.ids_present: fp.seek(number_of_elements*SIZE_INT, os.SEEK_CUR) if element_type in NODES_PER_ELEMENT: nodes_per_element = NODES_PER_ELEMENT[element_type] fp.seek(nodes_per_element * number_of_elements * SIZE_INT, os.SEEK_CUR) # skip connectivity elif element_type == ElementType.NSIDED: polygon_node_counts = read_ints(fp, number_of_elements) fp.seek(polygon_node_counts.sum() * SIZE_INT, os.SEEK_CUR) elif element_type == ElementType.NFACED: polyhedra_face_counts = read_ints(fp, number_of_elements) face_node_counts = read_ints(fp, polyhedra_face_counts.sum()) fp.seek(face_node_counts.sum() * SIZE_INT, os.SEEK_CUR) # skip connectivity else: raise EnsightReaderError(f"Unsupported element type: {element_type}", fp) return cls( offset=offset, number_of_elements=number_of_elements, element_type=element_type, element_id_handling=element_id_handling, part_id=part_id, )
[docs] @staticmethod def write_element_block(fp: SeekableBufferedWriter, element_type: ElementType, connectivity: Int32NDArray, element_ids: Optional[Int32NDArray] = None) -> None: """ Write element block (not NSIDED/NFACED) to given opened file See `UnstructuredElementBlock.read_connectivity()`. """ assert element_type not in (ElementType.NFACED, ElementType.NFACED) number_of_elements = connectivity.shape[0] assert connectivity.shape[1] == element_type.nodes_per_element if element_ids is not None: assert element_ids.shape == (number_of_elements,) write_line(fp, f"{element_type}") write_int(fp, number_of_elements) if element_ids is not None: write_ints(fp, element_ids) write_ints(fp, connectivity.ravel("C"))
[docs] @staticmethod def write_element_block_nsided(fp: SeekableBufferedWriter, polygon_node_counts: Int32NDArray, polygon_connectivity: Int32NDArray, element_ids: Optional[Int32NDArray] = None) -> None: """ Write NSIDED element block to given opened file See `UnstructuredElementBlock.read_connectivity_nsided()`. """ number_of_elements, = polygon_node_counts.shape assert polygon_connectivity.shape == (polygon_node_counts.sum(),) if element_ids is not None: assert element_ids.shape == (number_of_elements,) write_line(fp, f"{ElementType.NSIDED}") write_int(fp, number_of_elements) if element_ids is not None: write_ints(fp, element_ids) write_ints(fp, polygon_node_counts) write_ints(fp, polygon_connectivity)
[docs] @staticmethod def write_element_block_nfaced(fp: SeekableBufferedWriter, polyhedra_face_counts: Int32NDArray, face_node_counts: Int32NDArray, face_connectivity: Int32NDArray, element_ids: Optional[Int32NDArray] = None) -> None: """ Write NFACED element block to given opened file See `UnstructuredElementBlock.read_connectivity_nfaced()`. """ number_of_elements, = polyhedra_face_counts.shape assert face_node_counts.shape == (polyhedra_face_counts.sum(),) assert face_connectivity.shape == (face_node_counts.sum(),) if element_ids is not None: assert element_ids.shape == (number_of_elements,) write_line(fp, f"{ElementType.NFACED}") write_int(fp, number_of_elements) if element_ids is not None: write_ints(fp, element_ids) write_ints(fp, polyhedra_face_counts) write_ints(fp, face_node_counts) write_ints(fp, face_connectivity)
[docs] @dataclass class GeometryPart: """ A part in EnSight Gold geometry file To use it: >>> import ensightreader >>> case = ensightreader.read_case("example.case") >>> geofile = case.get_geometry_model() >>> part_names = geofile.get_part_names() >>> part = geofile.get_part_by_name(part_names[0]) >>> print(part.is_volume()) >>> print(part.number_of_nodes) Each geometry part has its own set of nodes not shared with other parts. Elements are defined in blocks, where each block may have different type (tetrahedra, wedge, etc.) but all elements in the block have the same type. Attributes: offset: offset to 'part' line in the geometry file part_id: part number part_name: part name ('description' line) number_of_nodes: number of nodes for this part element_blocks: list of element blocks, in order of definition in the file node_id_handling: node ID presence element_id_handling: element ID presence changing_geometry: type of transient changes (coordinate/connectivity/none) """ offset: int part_id: int part_name: str number_of_nodes: int element_blocks: List[UnstructuredElementBlock] node_id_handling: IdHandling element_id_handling: IdHandling changing_geometry: Optional[ChangingGeometry] = None
[docs] def read_nodes(self, fp: SeekableBufferedReader) -> Float32NDArray: """ Read node coordinates for this part Args: fp: opened geometry file object in ``"rb"`` mode Returns: 2D ``(n, 3)`` array of float32 with node coordinates """ fp.seek(self.offset) assert read_line(fp).startswith("part") assert read_int(fp) == self.part_id assert read_line(fp).startswith(self.part_name) assert read_line(fp).startswith("coordinates") assert read_int(fp) == self.number_of_nodes if self.node_id_handling.ids_present: fp.seek(self.number_of_nodes * SIZE_INT, os.SEEK_CUR) arr = read_floats(fp, 3*self.number_of_nodes) return arr.reshape((self.number_of_nodes, 3), order="F")
[docs] def read_node_ids(self, fp: SeekableBufferedReader) -> Optional[Int32NDArray]: """ Read node IDs for this part, if present .. note:: This method simply returns the node ID array as present in the file; it does not differentiate between ``node id given`` and ``node id ignore``, etc. Args: fp: opened geometry file object in ``"rb"`` mode Returns: 1D array of int32 with node IDs, or None if node IDs are not present in the file """ if not self.node_id_handling.ids_present: return None fp.seek(self.offset) assert read_line(fp).startswith("part") assert read_int(fp) == self.part_id assert read_line(fp).startswith(self.part_name) assert read_line(fp).startswith("coordinates") assert read_int(fp) == self.number_of_nodes arr = read_ints(fp, self.number_of_nodes) return arr
[docs] def is_volume(self) -> bool: """Return True if part contains volume elements""" return any(block.element_type.dimension == 3 for block in self.element_blocks)
[docs] def is_surface(self) -> bool: """Return True if part contains surface elements""" return any(block.element_type.dimension == 2 for block in self.element_blocks)
@property def number_of_elements(self) -> int: """Return number of elements (of all types)""" return sum(block.number_of_elements for block in self.element_blocks)
[docs] def get_number_of_elements_of_type(self, element_type: ElementType) -> int: """Return number of elements (of given type)""" return sum(block.number_of_elements for block in self.element_blocks if block.element_type == element_type)
[docs] @classmethod def from_file(cls, fp: SeekableBufferedReader, node_id_handling: IdHandling, element_id_handling: IdHandling, changing_geometry_per_part: bool) -> "GeometryPart": """Used internally by `EnsightGeometryFile.from_file_path()`""" offset = fp.tell() fp.seek(0, os.SEEK_END) file_len = fp.tell() fp.seek(offset) element_blocks = [] changing_geometry = None part_line = read_line(fp) if not part_line.startswith("part"): raise EnsightReaderError("Expected 'part' line", fp) if changing_geometry_per_part: for changing_geometry_ in ChangingGeometry: if changing_geometry_.value in part_line: changing_geometry = changing_geometry_ break else: raise EnsightReaderError("Expected no_change/coord_change/conn_change in 'part' line", fp) part_id = read_int(fp) part_name = read_line(fp).rstrip("\x00 ") coordinates_line = read_line(fp) if not coordinates_line.startswith("coordinates"): raise EnsightReaderError("Expected 'coordinates' line (other part types not implemented)", fp) number_of_nodes = read_int(fp) # skip node IDs if node_id_handling.ids_present: fp.seek(number_of_nodes * SIZE_INT, os.SEEK_CUR) # skip node coordinates fp.seek(3 * number_of_nodes * SIZE_FLOAT, os.SEEK_CUR) # read element blocks while fp.tell() != file_len: element_type_line = peek_line(fp) if element_type_line.startswith("part"): break # end of this part, stop else: with add_exception_note_block(f"part_id = {part_id} ({part_name})"): element_block = UnstructuredElementBlock.from_file(fp, element_id_handling=element_id_handling, part_id=part_id) element_blocks.append(element_block) return cls( offset=offset, part_id=part_id, part_name=part_name, number_of_nodes=number_of_nodes, element_blocks=element_blocks, node_id_handling=node_id_handling, element_id_handling=element_id_handling, changing_geometry=changing_geometry, )
[docs] @staticmethod def write_part_header(fp: BinaryIO, part_id: int, part_name: str, node_coordinates: Float32NDArray, node_ids: Optional[Int32NDArray] = None) -> None: """ Write part header to given opened file ``node_coordiantes`` are supposed to be (N, 3)-shaped float32 array. """ number_of_nodes = node_coordinates.shape[0] assert node_coordinates.shape[1] == 3 if node_ids is not None: assert node_ids.shape == (number_of_nodes,) write_line(fp, "part") write_int(fp, part_id) write_line(fp, part_name) write_line(fp, "coordinates") write_int(fp, number_of_nodes) if node_ids is not None: write_ints(fp, node_ids) write_floats(fp, node_coordinates.ravel("F"))
[docs] def write_part( self, in_fp: SeekableBufferedReader, out_fp: BinaryIO, out_node_id_handling: IdHandling, out_element_id_handling: IdHandling, out_part_id: Optional[int] = None, out_part_name: Optional[str] = None, ) -> None: """ Write part to other geometry file This method will seek to the end of the file automatically. Usage: >>> import ensightreader, io >>> source_case = ensightreader.read_case("source.case") >>> dest_case = ensightreader.read_case("dest.case") >>> source_geo = source_case.get_geometry_model() >>> dest_geo = dest_case.get_geometry_model() >>> source_part = source_geo.get_part_by_name("my_part") >>> with source_geo.mmap() as source_mm, dest_geo.open_writeable() as dest_fp: .... source_part.write_part( .... source_mm, .... dest_fp, .... out_node_id_handling=dest_geo.node_id_handling, .... out_element_id_handling=dest_geo.element_id_handling, .... ) See Also: `EnsightCaseFile.append_part_geometry()` for high-level method for part copying Args: in_fp: Opened input geometry file (where this `GeometryPart` is from) out_fp: Opened output geometry file for writing (use `EnsightGeometryFile.open_writeable()`, not mmap) out_node_id_handling: Node ID handling of output geometry file out_element_id_handling: Element ID handling of output geometry file out_part_id: Write with different part ID than in input file (default: use current ID) out_part_name: Write with different part name than in input file (default: use current name) """ if out_node_id_handling.ids_present: if self.node_id_handling.ids_present: node_ids = self.read_node_ids(in_fp) else: node_ids = np.arange(self.number_of_nodes, dtype=np.int32) else: node_ids = None out_fp.seek(0, io.SEEK_END) self.write_part_header( out_fp, part_id=self.part_id if out_part_id is None else out_part_id, part_name=self.part_name if out_part_name is None else out_part_name, node_coordinates=self.read_nodes(in_fp), node_ids=node_ids, ) for block in self.element_blocks: if out_element_id_handling.ids_present: if self.element_id_handling.ids_present: element_ids = block.read_element_ids(in_fp) else: element_ids = np.arange(block.number_of_elements, dtype=np.int32) else: element_ids = None if block.element_type == ElementType.NSIDED: polygon_node_counts, polygon_connectivity = block.read_connectivity_nsided(in_fp) UnstructuredElementBlock.write_element_block_nsided( out_fp, polygon_node_counts=polygon_node_counts, polygon_connectivity=polygon_connectivity, element_ids=element_ids, ) elif block.element_type == ElementType.NFACED: polyhedra_face_counts, face_node_counts, face_connectivity = block.read_connectivity_nfaced(in_fp) UnstructuredElementBlock.write_element_block_nfaced( out_fp, polyhedra_face_counts=polyhedra_face_counts, face_node_counts=face_node_counts, face_connectivity=face_connectivity, element_ids=element_ids, ) else: connectivity = block.read_connectivity(in_fp) UnstructuredElementBlock.write_element_block( out_fp, element_type=block.element_type, connectivity=connectivity, element_ids=element_ids, )
def read_array(fp: SeekableBufferedReader, count: int, dtype: Type[TNum]) -> npt.NDArray[TNum]: if isinstance(fp, _mmap.mmap): offset = fp.tell() bytes_to_read = count * dtype().itemsize buffer = fp.read(bytes_to_read) if len(buffer) != bytes_to_read: raise EnsightReaderError(f"Only read {len(buffer)} bytes, expected {bytes_to_read} bytes", fp) # TODO figure out a sane way to ask `mmap.mmap` about its access mode if "ACCESS_WRITE" in str(fp): # For write-through memory-mapped access, simply create an `ndarray` backed by the # mmap, which results in writable `ndarray`; the `fp.read()` call is only used to advance # the file pointer in this case. arr: npt.NDArray[TNum] = np.ndarray((count,), dtype=dtype, buffer=memoryview(fp), offset=offset) return arr else: # For read-only memory-mapped access, we want to wrap `bytes` as returned from the mmap; # this results in non-writeable `ndarray`. arr = np.frombuffer(buffer, dtype=dtype) return arr else: # For regular file access, we allocate buffer first and then read into it, # this results in writeable `ndarray`. arr = np.empty((count,), dtype=dtype) bytes_to_read = arr.data.nbytes n = fp.readinto(arr.data) # type: ignore[attr-defined] if n != bytes_to_read: raise EnsightReaderError(f"Only read {n} bytes, expected {bytes_to_read} bytes", fp) return arr def write_array(fp: SeekableBufferedWriter, data: npt.NDArray[TNum]) -> None: fp.write(data.data) def read_ints(fp: SeekableBufferedReader, count: int) -> Int32NDArray: return read_array(fp, count, np.int32) def write_ints(fp: SeekableBufferedWriter, data: Int32NDArray) -> None: assert np.issubdtype(data.dtype, np.int32) write_array(fp, data) def read_int(fp: SeekableBufferedReader) -> int: return int(read_ints(fp, 1)[0]) def write_int(fp: SeekableBufferedReader, value: int) -> None: write_ints(fp, np.asarray([value], dtype=np.int32)) def read_floats(fp: SeekableBufferedReader, count: int) -> Float32NDArray: return read_array(fp, count, np.float32) def write_floats(fp: SeekableBufferedWriter, data: Float32NDArray) -> None: assert np.issubdtype(data.dtype, np.float32) write_array(fp, data) def read_float(fp: SeekableBufferedReader) -> float: return float(read_floats(fp, 1)[0]) def write_float(fp: SeekableBufferedReader, value: float) -> None: write_floats(fp, np.asarray([value], dtype=np.float32)) def read_string(fp: SeekableBufferedReader, count: int) -> str: data = fp.read(count) if len(data) != count: raise EnsightReaderError(f"Only read {len(data)} bytes, expected {count} bytes", fp) return data.decode("ascii", "replace") def write_string(fp: SeekableBufferedWriter, s: Union[str, bytes]) -> None: if isinstance(s, str): data = s.encode("ascii", "replace") else: data = s fp.write(data) def read_line(fp: SeekableBufferedReader) -> str: return read_string(fp, 80) def peek_line(fp: SeekableBufferedReader) -> str: s = read_string(fp, 80) fp.seek(-len(s), os.SEEK_CUR) return s def write_line(fp: SeekableBufferedWriter, s: Union[str, bytes]) -> None: assert len(s) <= 80 if isinstance(s, str): data = s.encode("ascii", "replace") else: data = s data = data + b"\x00"*(80 - len(data)) write_string(fp, data)
[docs] def apply_affine_transform(arr: Float32NDArray, m: Float32NDArray, translate: bool = True) -> None: """ Do in-place affine transformation of given array of shape (n, 3) using given matrix Expected shape of the matrix is:: [[r00, r01, r02, 0], [r10, r11, r12, 0], [r20, r21, r22, 0], [dx, dy, dz, 1]] The performed operations are: ``arr.dot(m[:3][:3])`` and ``arr += m[3, :3]``. Args: arr: Input data to be transformed m: A 4x4 affine transformation matrix translate: Whether to apply translation (defaults to True) """ rot_matrix = m[:3, :3] translation_vector = m[3, :3] if np.count_nonzero(rot_matrix - np.eye(3)) > 0: arr[:] = arr.dot(rot_matrix) if translate and np.count_nonzero(translation_vector) > 0: arr[:] += translation_vector
[docs] @dataclass class EnsightGeometryFile: """ EnSight Gold binary geometry file To use it: >>> import ensightreader >>> case = ensightreader.read_case("example.case") >>> geofile = case.get_geometry_model() >>> part_names = geofile.get_part_names() >>> part = geofile.get_part_by_name(part_names[0]) This objects contains metadata parsed from EnSight Gold geometry file; it does not hold any node coordinates, connectivity, etc., but after it's been created it can read any such data on demand. Attributes: file_path: path to the actual geometry file (no wildcards) description_line1: first line in header description_line2: second line in header node_id_handling: node ID presence element_id_handling: element ID presence extents: optional extents given in header (xmin, xmax, ymin, ymax, zmin, zmax) parts: dictonary mapping part IDs to `GeometryPart` objects changing_geometry_per_part: whether parts contain information about type of transient changes """ file_path: str description_line1: str description_line2: str node_id_handling: IdHandling element_id_handling: IdHandling extents: Optional[Float32NDArray] parts: Dict[int, GeometryPart] # part ID -> GeometryPart changing_geometry_per_part: bool
[docs] def reload_from_file(self) -> None: """ Parse file again and update internal metadata This is meant to be called after writing to the geometry file, to see the changes. Be sure to flush your writes before calling this method. """ geofile = self.from_file_path( file_path=self.file_path, changing_geometry_per_part=self.changing_geometry_per_part, ) self.description_line1 = geofile.description_line1 self.description_line2 = geofile.description_line2 self.node_id_handling = geofile.node_id_handling self.element_id_handling = geofile.element_id_handling self.extents = geofile.extents self.parts = geofile.parts
[docs] def get_part_names(self) -> List[str]: """Return list of part names""" return [part.part_name for part in self.parts.values()]
[docs] def get_part_ids(self) -> List[int]: """Return list of part IDs""" return [part.part_id for part in self.parts.values()]
[docs] def get_part_by_name(self, name: str) -> Optional[GeometryPart]: """Return part with given name, or None""" for part in self.parts.values(): if part.part_name == name: return part return None
[docs] def get_part_by_id(self, part_id: int) -> Optional[GeometryPart]: """Return part with given part ID, or None""" for part in self.parts.values(): if part.part_id == part_id: return part return None
[docs] def iter_parts(self) -> Iterable[GeometryPart]: """Yield all GeometryParts""" return self.parts.values()
[docs] @classmethod def from_file_path(cls, file_path: Union[str, os.PathLike[str]], changing_geometry_per_part: bool) -> "EnsightGeometryFile": """Parse EnSight Gold geometry file""" extents = None parts = {} with open(file_path, "rb") as fp: fp.seek(0, os.SEEK_END) file_len = fp.tell() fp.seek(0) first_line = read_line(fp) if not first_line.lower().startswith("c binary"): raise EnsightReaderError("Only 'C Binary' files are supported", fp) description_line1 = read_line(fp) description_line2 = read_line(fp) node_id_line = read_line(fp) m = re.match(r"node id ([a-z]+)", node_id_line) if m: node_id_handling = IdHandling(m.group(1)) else: raise EnsightReaderError("Unexpected 'node id' line", fp) element_id_line = read_line(fp) m = re.match(r"element id ([a-z]+)", element_id_line) if m: element_id_handling = IdHandling(m.group(1)) else: raise EnsightReaderError("Unexpected 'element id' line", fp) if fp.tell() != file_len: tmp = peek_line(fp) if tmp.startswith("extents"): _ = read_line(fp) # 'extents' line extents = read_floats(fp, 6) elif tmp.startswith("part"): pass # expected, we can start reading parts else: raise EnsightReaderError("Expected 'extents' or 'part' line", fp) # read parts while fp.tell() != file_len: part = GeometryPart.from_file(fp, node_id_handling=node_id_handling, element_id_handling=element_id_handling, changing_geometry_per_part=changing_geometry_per_part) if part.part_id in parts: raise EnsightReaderError(f"Duplicate part id: {part.part_id}", fp) parts[part.part_id] = part return cls( file_path=str(file_path), description_line1=description_line1, description_line2=description_line2, node_id_handling=node_id_handling, element_id_handling=element_id_handling, extents=extents, parts=parts, changing_geometry_per_part=changing_geometry_per_part, )
[docs] @staticmethod def write_header(fp: BinaryIO, description_line1: str = "Generated by ensightreader", description_line2: str = "", node_id_handling: IdHandling = IdHandling.OFF, element_id_handling: IdHandling = IdHandling.OFF, extents: Optional[Float32NDArray] = None) -> None: """ Writes geometry file header to given opened file Make sure that ``fp`` seek position is at the beginning of the file. """ write_line(fp, "C binary") write_line(fp, description_line1) write_line(fp, description_line2) write_line(fp, f"node id {node_id_handling}") write_line(fp, f"element id {element_id_handling}") if extents is not None: write_line(fp, "extents") write_floats(fp, extents)
[docs] def open(self) -> BinaryIO: """ Return the opened file in read-only binary mode (convenience method) Use this in ``with`` block and pass the resulting object to `GeometryPart` methods. Note: This is a simpler alternative to the similar ``mmap()`` method - it doesn't require you to reason about lifetime of the opened file since you will be getting arrays which own their data (as opposed to views into the memory-mapped file). Usage:: with geometry_file.open() as fp_geo: nodes = part.read_nodes(fp_geo) Equivalent code:: with open(geometry_file.file_path, "rb") as fp_geo: nodes = part.read_nodes(fp_geo) """ return open(self.file_path, "rb")
[docs] def open_writeable(self) -> BinaryIO: """ Return the opened file in read-write binary mode (convenience method) Use this in ``with`` block and pass the resulting object to `GeometryPart` methods. Usage:: with geometry_file.open_writeable() as fp_geo: ... Equivalent code:: with open(geometry_file.file_path, "r+b") as fp_geo: nodes = part.read_nodes(fp_geo) """ return open(self.file_path, "r+b")
[docs] @contextmanager def mmap(self) -> Generator[_mmap.mmap, None, None]: """ Return read-only memorymap of the file (convenience method) Use this in ``with`` block and pass the resulting object to `GeometryPart` methods. Note: This is preferred over the similar ``open()`` method if you don't need a copy of the data since the returned arrays will be backed by the memorymap. Be careful to keep the memorymap around as long as you need the arrays. Usage:: with geometry_file.mmap() as fp_geo: nodes = part.read_nodes(fp_geo) Equivalent code:: with open(geometry_file.file_path, "rb") as fp_geo, _mmap.mmap(fp_geo.fileno(), 0, access=mmap.ACCESS_READ) as mm_geo: nodes = part.read_nodes(mm_geo) """ with open(self.file_path, "rb") as fp, _mmap.mmap(fp.fileno(), 0, access=_mmap.ACCESS_READ) as mm: yield mm
[docs] @contextmanager def mmap_writable(self) -> Generator[_mmap.mmap, None, None]: """ Return writable memorymap of the file (convenience method) Use this in ``with`` block and pass the resulting object to `GeometryPart` methods. Note: This special version of the ``mmap()`` method can be used if you wish to modify the underlying file. Use carefully. Usage:: with geometry_file.mmap_writable() as fp_geo: nodes = part.read_nodes(fp_geo) nodes[:, 0] = 0.0 # set first coordinate to zero for part nodes Equivalent code:: with open(geometry_file.file_path, "r+b") as fp_geo, _mmap.mmap(fp_geo.fileno(), 0, access=mmap.ACCESS_WRITE) as mm_geo: nodes = part.read_nodes(mm_geo) nodes[:, 0] = 0.0 # set X coordinate to zero for part nodes """ with open(self.file_path, "r+b") as fp, _mmap.mmap(fp.fileno(), 0, access=_mmap.ACCESS_WRITE) as mm: yield mm
[docs] def affine_transform(self, m: Float32NDArray) -> None: """ Apply affine transformation to all parts See Also: `apply_affine_transform()` Args: m: 4x4 affine transformation matrix """ self.visit(_AffineTransformGeometryVisitor(m))
[docs] def visit(self, visitor: GeometryVisitor) -> None: """Apply a `GeometryVisitor` to all parts""" with (self.mmap() if visitor.is_read_only() else self.mmap_writable()) as mm: for part in self.iter_parts(): coordinates_arr = part.read_nodes(mm) visitor.visit_part(coordinates_arr, part)
[docs] @dataclass class EnsightVariableFile: """ EnSight Gold binary variable file To use it: >>> import ensightreader >>> case = ensightreader.read_case("example.case") >>> geofile = case.get_geometry_model() >>> velocity_variable = case.get_variable("U") >>> part_names = geofile.get_part_names() >>> part = geofile.get_part_by_name(part_names[0]) >>> with open(velocity_variable.file_path, "rb") as fp_var: ... part_velocity = velocity_variable.read_node_data(fp_var, part.part_id) .. note:: - there are some limitations for per-element variable files, see `EnsightVariableFile.read_element_data()` - ``coordinates partial`` is not supported This objects contains metadata parsed from EnSight Gold variable file; it does not hold any variable data arrays, but after it's been created it can read any such data on demand. Attributes: file_path: path to the actual variable file (no wildcards) description_line: line in header variable_name: name of the variable in case file variable_location: where the variable is defined (elements or nodes) variable_type: type of the variable (scalar, ...) part_offsets: dictionary mapping part IDs to offset to 'part' line in file part_element_offsets: for per-element variables, this holds a dictionary mapping ``(part ID, element type)`` tuples to offset to 'element type' line in file part_per_node_undefined_values: for per-node variables, this holds a dictionary mapping part IDs to value that should be considered as undefined (``coordinates undef``) part_per_element_undefined_values: for per-element variables, this holds a dictionary mapping ``(part ID, element type)`` tuples to value that should be considered as undefined (``element_type undef``) """ file_path: str description_line: str variable_name: str variable_location: VariableLocation variable_type: VariableType part_offsets: Dict[int, int] part_element_offsets: Dict[Tuple[int, ElementType], int] geometry_file: EnsightGeometryFile part_per_node_undefined_values: Dict[int, float] part_per_element_undefined_values: Dict[Tuple[int, ElementType], float]
[docs] def reload_from_file(self) -> None: """ Parse file again and update internal metadata This is meant to be called after writing to the variable file, to see the changes. Be sure to flush your writes before calling this method. """ variable = self.from_file_path( file_path=self.file_path, variable_name=self.variable_name, variable_location=self.variable_location, variable_type=self.variable_type, geofile=self.geometry_file ) self.description_line = variable.description_line self.part_offsets = variable.part_offsets self.part_element_offsets = variable.part_element_offsets self.part_per_node_undefined_values = variable.part_per_node_undefined_values self.part_per_element_undefined_values = variable.part_per_element_undefined_values
[docs] def iter_part_ids(self) -> Iterable[int]: """Yield all part IDs""" return self.part_offsets.keys()
[docs] def iter_part_id_element_types(self, part_id: int) -> Iterable[ElementType]: """Yield all ElementTypes for given part ID""" return (et for pid, et in self.part_element_offsets.keys() if pid == part_id)
[docs] def is_defined_for_part_id(self, part_id: int) -> bool: """Return True if variable is defined for given part, else False""" return part_id in self.part_offsets
[docs] def read_node_data(self, fp: SeekableBufferedReader, part_id: int) -> Optional[Float32NDArray]: """ Read per-node variable data for given part .. note:: Variable is always either per-node or per-element; be sure to call the right method for your data. Args: fp: opened variable file object in "rb" mode part_id: part number for which to read data Returns: If the variable is not defined for the part, None is returned. If the variable is defined and is a scalar, 1D array of float32 is returned. Otherwise the returned value is 2D ``(n, k)`` array of float32 where ``n`` is number of nodes and ``k`` is number of values based on variable type (vector, tensor). """ part = self.geometry_file.parts[part_id] undefined_value = self.part_per_node_undefined_values.get(part_id) if not self.variable_location == VariableLocation.PER_NODE: raise ValueError("Variable is not per node") offset = self.part_offsets.get(part_id) if offset is None: return None fp.seek(offset) assert read_line(fp).startswith("part") assert read_int(fp) == part_id assert read_line(fp).startswith("coordinates") if undefined_value is not None: assert read_float(fp) == undefined_value n = part.number_of_nodes k = VALUES_FOR_VARIABLE_TYPE[self.variable_type] arr = read_floats(fp, n*k) if k > 1: arr = arr.reshape((n, k), order="F") return arr
[docs] def read_element_data(self, fp: SeekableBufferedReader, part_id: int, element_type: ElementType) -> Optional[Float32NDArray]: """ Read per-element variable data for given part and element type Due to how EnSight Gold format works, variable file mirrors the geometry file in that per-element values are (alas) defined separately for each element block. This introduces edge cases (missing element blocks so that the variable could only be partially defined for the part; multiple element blocks of the same element type; etc.) which may be tedious to handle. This implementation relies on the assumption that there are no repeated blocks of the same type. .. note:: Variable is always either per-node or per-element; be sure to call the right method for your data. Args: fp: opened variable file object in "rb" mode part_id: part number for which to read data element_type: element type for which to read data (typically, you want to iterate over element blocks in the part and retrieve data for their respective element types) Returns: If the variable is not defined for the part, None is returned. If the variable is defined and is a scalar, 1D array of float32 is returned. Otherwise the returned value is 2D ``(n, k)`` array of float32 where ``n`` is number of nodes and ``k`` is number of values based on variable type (vector, tensor). """ part = self.geometry_file.parts[part_id] undefined_value = self.part_per_element_undefined_values.get((part_id, element_type)) if not self.variable_location == VariableLocation.PER_ELEMENT: raise ValueError("Variable is not per element") offset = self.part_element_offsets.get((part_id, element_type)) if offset is None: return None fp.seek(offset) assert read_line(fp).startswith(element_type.value) if undefined_value is not None: assert read_float(fp) == undefined_value n = part.get_number_of_elements_of_type(element_type) k = VALUES_FOR_VARIABLE_TYPE[self.variable_type] arr = read_floats(fp, n*k) if k > 1: arr = arr.reshape((n, k), order="F") return arr
[docs] @classmethod def from_file_path(cls, file_path: Union[str, os.PathLike[str]], variable_name: str, variable_location: VariableLocation, variable_type: VariableType, geofile: EnsightGeometryFile) -> "EnsightVariableFile": """Used internally by `EnsightVariableFileSet.get_file()`""" part_offsets = {} part_element_offsets: Dict[Tuple[int, ElementType], int] = {} part_per_node_undefined_values: Dict[int, float] = {} part_per_element_undefined_values: Dict[Tuple[int, ElementType], float] = {} with open(file_path, "rb") as fp: fp.seek(0, os.SEEK_END) file_len = fp.tell() fp.seek(0) description_line = read_line(fp) # read parts while fp.tell() != file_len: part_offset = fp.tell() part_line = read_line(fp) if not part_line.startswith("part"): raise EnsightReaderError(f"Expected 'part' line, got: {part_line!r}", fp) part_id = read_int(fp) if part_id not in geofile.parts: raise EnsightReaderError(f"Variable file has data for part id {part_id}, " f"but this part is not in geofile {geofile.file_path}", fp) if part_id in part_offsets: raise EnsightReaderError(f"Duplicate definition of part id {part_id}", fp) part = geofile.parts[part_id] part_offsets[part_id] = part_offset if variable_location == VariableLocation.PER_NODE: coordinates_line = read_line(fp) if not coordinates_line.startswith("coordinates"): raise EnsightReaderError(f"Expected 'coordinates' line, got: {coordinates_line!r}", fp) if "undef" in coordinates_line: undefined_value = read_float(fp) part_per_node_undefined_values[part_id] = undefined_value elif "partial" in coordinates_line: raise EnsightReaderError(f"'coordinates partial' is not supported (part id {part_id})", fp) part_offsets[part_id] = part_offset # skip data n = part.number_of_nodes k = VALUES_FOR_VARIABLE_TYPE[variable_type] fp.seek(n * k * SIZE_FLOAT, os.SEEK_CUR) elif variable_location == VariableLocation.PER_ELEMENT: while fp.tell() != file_len: part_element_offset = fp.tell() element_type_line = peek_line(fp) if element_type_line.startswith("part"): break # new part starts _ = read_line(fp) # 'element type' line try: element_type = ElementType.parse_from_line(element_type_line) except ValueError as e: raise EnsightReaderError("Bad element type", fp) from e if "undef" in element_type_line: undefined_value = read_float(fp) part_per_element_undefined_values[part_id, element_type] = undefined_value elif "partial" in element_type_line: raise EnsightReaderError(f"'element_type partial' is not supported (part id {part_id})", fp) blocks = [block for block in part.element_blocks if block.element_type == element_type] if not blocks: raise EnsightReaderError(f"Variable file has data for part id {part_id}, " f"element type {element_type}, but this element type " f"is not in geofile {geofile.file_path}", fp) if len(blocks) > 1: raise EnsightReaderError(f"Variable file has data for part id {part_id}, " f"element type {element_type}, but there are multiple " f"element blocks of this type in geofile {geofile.file_path} " f"(handling of this is not implemented, please have only one block " f"of each type)", fp) part_element_offsets[part_id, element_type] = part_element_offset # skip data n = blocks[0].number_of_elements k = VALUES_FOR_VARIABLE_TYPE[variable_type] fp.seek(n * k * SIZE_FLOAT, os.SEEK_CUR) else: raise EnsightReaderError(f"Bad variable location {variable_location}", fp) return cls( file_path=str(file_path), description_line=description_line, variable_name=variable_name, variable_location=variable_location, variable_type=variable_type, part_offsets=part_offsets, part_element_offsets=part_element_offsets, geometry_file=geofile, part_per_node_undefined_values=part_per_node_undefined_values, part_per_element_undefined_values=part_per_element_undefined_values, )
[docs] def write_header( self, fp: BinaryIO, description_line: str = "Generated by ensightreader", ) -> None: """Write initial part of variable file""" write_line(fp, description_line)
[docs] def write_node_data( self, fp: BinaryIO, part_id: int, arr: Float32NDArray, ) -> None: """Write variable data for part (per-node)""" part = self.geometry_file.parts[part_id] if not self.variable_location == VariableLocation.PER_NODE: raise ValueError("Variable is not per node") n = part.number_of_nodes k = VALUES_FOR_VARIABLE_TYPE[self.variable_type] if k > 1: assert arr.shape == (n, k) arr = arr.reshape((n, k), order="F") else: assert arr.shape in ((n,), (n, 1)) write_line(fp, "part") write_int(fp, part_id) write_line(fp, "coordinates") write_floats(fp, arr)
[docs] def write_element_data( self, fp: BinaryIO, part_id: int, arr_per_element: Mapping[ElementType, Float32NDArray], ) -> None: """Write variable data for part (per-element)""" part = self.geometry_file.parts[part_id] if not self.variable_location == VariableLocation.PER_ELEMENT: raise ValueError("Variable is not per element") write_line(fp, "part") write_int(fp, part_id) for block in part.element_blocks: arr = arr_per_element[block.element_type] n = block.number_of_elements k = VALUES_FOR_VARIABLE_TYPE[self.variable_type] if k > 1: assert arr.shape == (n, k) arr = arr.reshape((n, k), order="F") else: assert arr.shape in ((n,), (n, 1)) write_line(fp, f"{block.element_type.value}") write_floats(fp, arr)
[docs] def ensure_data_for_part( self, fp: BinaryIO, part_id: int, default_value: float = 0.0, _reload_from_file: bool = True, ) -> None: """ Append variable definition for given part if variable is undefined currently, otherwise do nothing This method will seek to the end of the file automatically. See Also: `EnsightVariableFile.ensure_data_for_all_parts()` Args: fp: Opened writable variable file (use `EnsightVariableFile.open_writeable()`, not mmap) part_id: Part ID to be defined default_value: Constant value that will be filled in if the variable is not defined """ if self.is_defined_for_part_id(part_id): return fp.seek(0, io.SEEK_END) part = self.geometry_file.parts[part_id] k = VALUES_FOR_VARIABLE_TYPE[self.variable_type] if self.variable_location == VariableLocation.PER_NODE: arr = np.full((part.number_of_nodes, k), default_value, np.float32) self.write_node_data(fp, part_id, arr) elif self.variable_location == VariableLocation.PER_ELEMENT: arr_per_element = { block.element_type: np.full((block.number_of_elements, k), default_value, np.float32) for block in part.element_blocks } self.write_element_data(fp, part_id, arr_per_element) else: raise NotImplementedError("unexpected variable location") if _reload_from_file: fp.flush() self.reload_from_file()
[docs] def ensure_data_for_all_parts(self, fp: BinaryIO, default_value: float = 0.0) -> None: """ Append variable definitions for all parts that are currently undefined This method will seek to the end of the file automatically. Usage: >>> from ensightreader import read_case >>> case = read_case("sphere.case") >>> variable = case.get_variable("RTData") >>> with variable.open_writeable() as fp: ... variable.ensure_data_for_all_parts(fp) See Also: `EnsightVariableFile.ensure_data_for_part()` Args: fp: Opened writable variable file (use `EnsightVariableFile.open_writeable()`, not mmap) default_value: Constant value that will be filled in if the variable is not defined """ for part_id in self.geometry_file.get_part_ids(): self.ensure_data_for_part(fp, part_id, default_value, _reload_from_file=False) fp.flush() self.reload_from_file()
[docs] def open(self) -> BinaryIO: """ Return the opened file in read-only binary mode (convenience method) Use this in ``with`` block and pass the resulting object to ``read_node_data()`` and ``read_element_data()`` methods. Note: This is a simpler alternative to the similar ``mmap()`` method - it doesn't require you to reason about lifetime of the opened file since you will be getting arrays which own their data (as opposed to views into the memory-mapped file). Usage:: with variable_file.open() as fp_var: variable_data = variable.read_node_data(fp_var, part.part_id) Equivalent code:: with open(variable_file.file_path, "rb") as fp_var: variable_data = variable.read_node_data(fp_var, part.part_id) """ return open(self.file_path, "rb")
[docs] def open_writeable(self) -> BinaryIO: """ Return the opened file in read-write binary mode (convenience method) Use this in ``with`` block and pass the resulting object to ``read_node_data()`` and ``read_element_data()`` methods. Usage:: with variable_file.open_writeable() as fp_var: variable_data = variable.read_node_data(fp_var, part.part_id) Equivalent code:: with open(variable_file.file_path, "r+b") as fp_var: variable_data = variable.read_node_data(fp_var, part.part_id) """ return open(self.file_path, "r+b")
[docs] @contextmanager def mmap(self) -> Generator[_mmap.mmap, None, None]: """ Return read-only memorymap of the file (convenience method) Use this in ``with`` block and pass the resulting object to ``read_node_data()`` and ``read_element_data()`` methods. Note: This is preferred over the similar ``open()`` method if you don't need a copy of the data since the returned arrays will be backed by the memorymap. Be careful to keep the memorymap around as long as you need the arrays. Usage:: with variable_file.mmap() as mm_var: variable_data = variable.read_node_data(mm_var, part.part_id) Equivalent code:: with open(variable_file.file_path, "rb") as fp_var, _mmap.mmap(fp_var.fileno(), 0, access=mmap.ACCESS_READ) as mm_var: variable_data = variable.read_node_data(mm_var, part.part_id) """ with open(self.file_path, "rb") as fp, _mmap.mmap(fp.fileno(), 0, access=_mmap.ACCESS_READ) as mm: yield mm
[docs] @contextmanager def mmap_writable(self) -> Generator[_mmap.mmap, None, None]: """ Return writable memorymap of the file (convenience method) Use this in ``with`` block and pass the resulting object to ``read_node_data()`` and ``read_element_data()`` methods. Note: This special version of the ``mmap()`` method can be used if you wish to modify the underlying file. Use carefully. Usage:: with geometry_file.mmap_writable() as fp_geo: nodes = part.read_nodes(fp_geo) nodes[:, 0] = 0.0 # set X coordinate to zero for part nodes Equivalent code:: with open(geometry_file.file_path, "r+b") as fp_geo, _mmap.mmap(fp_geo.fileno(), 0, access=mmap.ACCESS_WRITE) as mm_geo: nodes = part.read_nodes(mm_geo) Usage:: with variable_file.mmap_writable() as mm_var: variable_data = variable.read_node_data(mm_var, part.part_id) variable_data[:] = np.sqrt(variable_data) # apply square root function to the data Equivalent code:: with open(variable_file.file_path, "r+b") as fp_var, _mmap.mmap(fp_var.fileno(), 0, access=mmap.ACCESS_WRITE) as mm_var: variable_data = variable.read_node_data(mm_var, part.part_id) variable_data[:] = np.sqrt(variable_data) # apply square root function to the data """ with open(self.file_path, "r+b") as fp, _mmap.mmap(fp.fileno(), 0, access=_mmap.ACCESS_WRITE) as mm: yield mm
[docs] def affine_transform(self, m: Float32NDArray, geofile: EnsightGeometryFile) -> None: """ Apply affine transformation to all parts See Also: `apply_affine_transform()` Args: m: 4x4 affine transformation matrix geofile: EnsightGeometryFile for the relevant timestep """ self.visit(_AffineTransformVariableVisitor(m), geofile)
[docs] def visit(self, visitor: VariableVisitor, geofile: EnsightGeometryFile) -> None: """ Apply a `VariableVisitor` to all part data Examples: >>> import ensightreader >>> class MyVariableVisitor(ensightreader.VariableVisitor): ... pass # your implementation >>> case = read_case("my.case") >>> case.get_variable("pressure").visit(MyVariableVisitor()) """ if not visitor.visit_variable(self.variable_location, self.variable_type, self.variable_name): return with (self.mmap() if visitor.is_read_only() else self.mmap_writable()) as mm: for part_id in self.iter_part_ids(): part = geofile.get_part_by_id(part_id) if part is None: raise EnsightReaderError(f"Cannot find part id {part_id} in provided geofile {geofile!r}") if self.variable_location == VariableLocation.PER_NODE: data_arr = self.read_node_data(mm, part_id) if data_arr is not None: visitor.visit_part(data_arr, part, self) elif self.variable_location == VariableLocation.PER_ELEMENT: for element_type in self.iter_part_id_element_types(part_id): data_arr = self.read_element_data(mm, part_id, element_type) if data_arr is not None: visitor.visit_part(data_arr, part, self) else: raise NotImplementedError
def fill_wildcard(filename: str, value: int) -> str: return re.sub(r"\*+", lambda m: str(value).zfill(len(m.group(0))), filename, count=1) def strip_quotes(filename: str) -> str: return re.sub('"(.+)"', r"\1", filename)
[docs] @dataclass class EnsightGeometryFileSet: """ Helper object for loading geometry files This is used by `EnsightCaseFile.get_geometry_model()` to give you `EnsightGeometryFile`. Attributes: casefile_dir_path: path to casefile directory (root for relative paths in casefile) timeset: time set in which the geometry is defined, or None if it's not transient filename: path to the data file(s), including ``*`` wildcards if transient changing_geometry_per_part: whether parts contain information about type of transient changes """ casefile_dir_path: str timeset: Optional[Timeset] filename: str changing_geometry_per_part: bool = False _file_cache: Dict[int, EnsightGeometryFile] = field(default_factory=dict, repr=False)
[docs] def get_file(self, timestep: int = 0) -> EnsightGeometryFile: """Return geometry for given timestep (use 0 if not transient)""" if timestep not in self._file_cache: self._file_cache[timestep] = self._get_file(timestep) return self._file_cache[timestep]
def _get_file(self, timestep: int = 0) -> EnsightGeometryFile: if self.timeset is None: timestep_filename = self.filename else: timestep_filename = fill_wildcard(self.filename, self.timeset.filename_numbers[timestep]) path = op.join(self.casefile_dir_path, timestep_filename) return EnsightGeometryFile.from_file_path(path, changing_geometry_per_part=self.changing_geometry_per_part)
[docs] def get_timestep_ids(self) -> Sequence[int]: """Return all timestep IDs""" return self.timeset.get_timestep_ids() if self.timeset is not None else [0]
[docs] def affine_transform(self, m: Float32NDArray) -> None: """ Apply affine transformation to all parts in all timesteps See Also: `apply_affine_transform()` Args: m: 4x4 affine transformation matrix """ self.visit(_AffineTransformGeometryVisitor(m))
[docs] def visit(self, visitor: GeometryVisitor) -> None: """Apply a `GeometryVisitor` to all parts in all timesteps""" for i in self.get_timestep_ids(): self.get_file(i).visit(visitor)
def _clear_cache(self) -> None: # This has to be private because of copies of EnsightGeometryFile inside EnsightVariableFile; # we need to read everything again otherwise it might be inconsistent. self._file_cache.clear()
[docs] @dataclass class EnsightVariableFileSet: """ Helper object for loading variable files This is used by `EnsightCaseFile.get_variable()` to give you `EnsightVariableFile`. Attributes: casefile_dir_path: path to casefile directory (root for relative paths in casefile) timeset: time set in which the variable is defined, or None if it's not transient variable_location: where the variable is defined (elements or nodes) variable_type: type of the variable (scalar, ...) variable_name: name of the variable ('description' field in casefile) filename: path to the data file(s), including ``*`` wildcards if transient geometry_model: instance of `EnsightGeometryFileSet` """ casefile_dir_path: str timeset: Optional[Timeset] variable_location: VariableLocation variable_type: VariableType variable_name: str filename: str geometry_model: EnsightGeometryFileSet _file_cache: Dict[int, EnsightVariableFile] = field(default_factory=dict, repr=False)
[docs] def get_file(self, timestep: int = 0, geofile: Optional[EnsightGeometryFile] = None) -> EnsightVariableFile: """ Return variable for given timestep (use 0 if not transient) If you have a case that has different timesets for variable and geometry, you must set `EnsightGeometryFile` explicitly via the parameter. Otherwise, it will be matched automatically. .. note:: Due to how EnSight Gold format works, we need to have matching geofile already parsed before we attempt to read the variable file. The variable file itself (alas) does not give lengths of the arrays it contains. For transient geometry combined with transient variable, pay extra care because they have to match - different timesteps can have different number of elements, nodes, etc. """ if timestep not in self._file_cache: self._file_cache[timestep] = self._get_file(timestep, geofile) return self._file_cache[timestep]
def _get_file(self, timestep: int = 0, geofile: Optional[EnsightGeometryFile] = None) -> EnsightVariableFile: geofile_ = geofile if geofile is not None else self.get_geofile_for_timestep(timestep) # XXX be sure to match geofile and timestep! if self.timeset is None: timestep_filename = self.filename else: timestep_filename = fill_wildcard(self.filename, self.timeset.filename_numbers[timestep]) path = op.join(self.casefile_dir_path, timestep_filename) return EnsightVariableFile.from_file_path(path, variable_name=self.variable_name, variable_location=self.variable_location, variable_type=self.variable_type, geofile=geofile_)
[docs] def get_geofile_for_timestep(self, timestep: int = 0) -> EnsightGeometryFile: """ Return `EnsightGeometryFile` belonging to given variable timestep If you have a case that has different timesets for variable and geometry, this will raise an exception. """ variable_ts = self.timeset geometry_ts = self.geometry_model.timeset if variable_ts is not None and geometry_ts is not None and variable_ts.timeset_id != geometry_ts.timeset_id: raise ValueError("variable and geometry timesets differ, must provide explicit geofile parameter") elif geometry_ts is not None: return self.geometry_model.get_file(timestep) else: return self.geometry_model.get_file()
[docs] def get_timestep_ids(self) -> Sequence[int]: """Return all timestep IDs""" return self.timeset.get_timestep_ids() if self.timeset is not None else [0]
[docs] def affine_transform(self, m: Float32NDArray) -> None: """ Apply affine transformation Args: m: 4x4 affine transformation matrix """ self.visit(_AffineTransformVariableVisitor(m))
[docs] def visit(self, visitor: VariableVisitor) -> None: """ Apply a `VariableVisitor` to all part data in all timesteps Examples: >>> import ensightreader >>> class MyVariableVisitor(ensightreader.VariableVisitor): ... pass # your implementation >>> case = read_case("my.case") >>> case.variables["pressure"].visit(MyVariableVisitor()) """ if not visitor.visit_variable(self.variable_location, self.variable_type, self.variable_name): return for i in self.get_timestep_ids(): geofile = self.get_geofile_for_timestep(i) self.get_file(i).visit(visitor, geofile)
def _clear_cache(self) -> None: self._file_cache.clear()
[docs] @dataclass class EnsightConstantVariable: """ Constant per case variable Represents ``constant per case`` or ``constant per case file`` variable. Attributes: timeset: time set in which the variable is defined, or None if it's not transient variable_name: name of the variable ('description' field in casefile) values: list of values (it has length 1 for non-transient variables; for transient variable its length corresponds to number of timesteps) """ timeset: Optional[Timeset] variable_name: str values: List[float]
[docs] def get_value(self, timestep: int = 0) -> float: """Return constant value at given timestep (for non-transient constants, use timestep 0)""" return self.values[timestep]
@classmethod def from_casefile_line( cls, key: str, values: List[str], casefile_dir_path: Union[str, os.PathLike[str]] ) -> Tuple["EnsightConstantVariable", Optional[int]]: if "file" in key: # constant per case file if len(values) == 2: ts = None variable_name = values[0] cvfilename = values[1] return cls( timeset=None, variable_name=variable_name, values=read_numbers_from_text_file(op.join(casefile_dir_path, cvfilename), float) ), ts elif len(values) == 3: ts = int(values[0]) variable_name = values[1] cvfilename = values[2] return cls( timeset=None, variable_name=variable_name, values=read_numbers_from_text_file(op.join(casefile_dir_path, cvfilename), float) ), ts else: raise ValueError("Unsupported constant variable line") else: # constant per case if len(values) == 2: ts = None variable_name = values[0] variable_value = float(values[1]) return cls( timeset=None, variable_name=variable_name, values=[variable_value] ), ts elif len(values) < 2: raise ValueError("Unsupported constant variable line") else: ts = int(values[0]) variable_name = values[1] variable_values = [float(x) for x in values[2:]] return cls( timeset=None, variable_name=variable_name, values=variable_values ), ts
def read_numbers_from_text_file(path: Union[str, os.PathLike[str]], type_: Type[T]) -> List[T]: output: List[T] = [] with open(path) as fp: for line in fp: values = line.split() output.extend(map(type_, values)) return output
[docs] @dataclass class EnsightCaseFile: """ EnSight Gold case file To load a case, use: >>> import ensightreader >>> case = ensightreader.read_case("example.case") >>> geofile = case.get_geometry_model() >>> velocity_variable = case.get_variable("U") EnSight Gold casefile is text file with description of the case and links to data files where the actual geometry and variables are stored. Since the case may be transient and therefore have multiple data files for the same variable, etc., there is an indirection: the actual data is accessed using `EnsightGeometryFile` and `EnsightVariableFile` objects, which are constructed using `EnsightGeometryFileSet`, `EnsightVariableFileSet` helper objects inside `EnsightCaseFile`. .. note:: - for geometry, only ``model`` is supported (no ``measured``, ``match`` or ``boundary``) - only unstructured grid (``coordinates``) geometry is supported; structured parts are not supported - filesets and single-file cases are not supported; only ``C Binary`` files are supported - ``change_coords_only`` is not supported - ``changing_geometry_per_part`` is only read and stored, not handled in reading geometry data - some variable types are unsupported, see `VariableType` - ghost cells are not supported - cases with more than one time set are supported, but you may not be able to use `EnsightCaseFile.get_variable()` since the library expects geometry and variable time sets to match; if you know which geofile belongs to which variable file you can read the variable files using `EnsightVariableFileSet.get_file()`. Attributes: casefile_path: path to the ``*.case`` file geometry_model: accessor to the ``model`` geometry data variables: dictionary mapping variable names keys to variable data accessors constant_variables: dictionary mapping constant variable names keys to `EnsightConstantVariable` timesets: dictionary mapping time set IDs to time set description objects """ casefile_path: str geometry_model: EnsightGeometryFileSet variables: Dict[str, EnsightVariableFileSet] # variable name -> EnsightVariableFileSet constant_variables: Dict[str, EnsightConstantVariable] # variable name -> EnsightConstantVariable timesets: Dict[int, Timeset] # time set ID -> TimeSet
[docs] def get_geometry_model(self, timestep: int = 0) -> EnsightGeometryFile: """ Get geometry for given timestep .. note:: The returned `EnsightGeometryFile` is cached, so it will not parse the file again when you request the geometry again. Args: timestep: number of timestep, starting from zero (for non-transient geometry, use 0) """ return self.geometry_model.get_file(timestep)
[docs] def get_variable(self, name: str, timestep: int = 0) -> EnsightVariableFile: """ Get variable for given timestep .. note:: The returned `EnsightVariableFile` is cached, so it will not parse the file again when you request the variable again. Args: name: name of the variable timestep: number of timestep, starting from zero (for non-transient variable, use 0) """ return self.variables[name].get_file(timestep)
[docs] def get_constant_variable_value(self, name: str, timestep: int = 0) -> float: """ Get value of constant per-case variable for given timestep Args: name: name of the constant variable timestep: number of timestep, starting from zero (for non-transient variable, use 0) """ return self.constant_variables[name].get_value(timestep)
[docs] def is_transient(self) -> bool: """Return True if the case is transient (has at least one time set defined)""" return len(self.timesets) > 0
[docs] def get_node_variables(self) -> List[str]: """Return names of variables defined per-node""" return [name for name, variable in self.variables.items() if variable.variable_location == VariableLocation.PER_NODE]
[docs] def get_element_variables(self) -> List[str]: """Return names of variables defined per-element""" return [name for name, variable in self.variables.items() if variable.variable_location == VariableLocation.PER_ELEMENT]
[docs] def get_constant_variables(self) -> List[str]: """Return names of constant variables defined per-case""" return list(self.constant_variables.keys())
[docs] def get_time_values(self, timeset_id: Optional[int] = None) -> Optional[List[float]]: """ Return time values for given time set This will return time values as specified in the case file (this could be physical time in seconds, or something else). Timesteps ``0``, ``1``, ... occur at times ``case.get_time_values()[0]``, ``case.get_time_values()[1]``, ... Args: timeset_id: time set ID for which to return time values - if there are less than two time sets, you can omit this parameter Returns: List of time values, or None if there are no time sets defined """ if not self.is_transient(): return None timeset_ids = list(self.timesets.keys()) if timeset_id is None: if len(timeset_ids) == 1: timeset_id = timeset_ids[0] else: raise ValueError("Multiple time sets in case, please specify timeset_id explicitly") if timeset_id not in self.timesets: raise KeyError(f"No timeset with id {timeset_id} (present timeset ids: {timeset_ids})") return self.timesets[timeset_id].time_values
[docs] def get_variables(self) -> List[str]: """ Return list of variable names Note that this does not include per-case constants. """ return list(self.variables.keys())
[docs] @classmethod def from_file(cls, casefile_path: Union[str, os.PathLike[str]]) -> "EnsightCaseFile": """ Read EnSight Gold case .. note:: This only reads the casefile, not any data files. Any geometry or variable data that you want to read must be read explicitly. Args: casefile_path: path to the ``*.case`` file """ casefile_dir_path = op.dirname(casefile_path) geometry_model = None dummy_geometry_model = EnsightGeometryFileSet("dummy", None, "dummy") geometry_model_ts: Optional[int] = None variables: Dict[str, EnsightVariableFileSet] = {} constant_variables: Dict[str, EnsightConstantVariable] = {} variables_ts: Dict[str, Optional[int]] = {} timesets: Dict[int, Timeset] = {} with open(casefile_path) as fp: current_section = None current_timeset: Optional[Timeset] = None current_timeset_file_start_number = None current_timeset_filename_increment = None changing_geometry_per_part = False last_key = None for lineno, line in enumerate(fp, 1): line = line.strip() if not line or line.startswith("#"): continue if line.isupper(): current_section = line continue if ":" in line: key, values_ = line.split(":", maxsplit=1) values: List[str] = values_.split() last_key = key else: key = None values = line.split() if current_section == "FORMAT": if key == "type" and values != ["ensight", "gold"]: raise EnsightReaderError("Expected 'ensight gold' in type line", fp, lineno) elif current_section == "GEOMETRY": if key == "model": if "change_coords_only" in values: raise EnsightReaderError("Model with 'change_coords_only' is not supported", fp, lineno) if values[-1] == "changing_geometry_per_part": changing_geometry_per_part = True values.pop() if len(values) == 1: filename, = values filename = strip_quotes(filename) if "*" in filename: corrected_line = f"{key}: 1 {' '.join(values)}" warnings.warn(EnsightReaderWarning( f"Geometry model looks transient, but no timeset is given (did you mean: '{corrected_line}'?)", fp, lineno )) geometry_model = EnsightGeometryFileSet( casefile_dir_path, timeset=None, filename=filename, changing_geometry_per_part=changing_geometry_per_part) elif len(values) == 2: ts_, filename = values filename = strip_quotes(filename) geometry_model_ts = int(ts_) geometry_model = EnsightGeometryFileSet( casefile_dir_path, timeset=None, # timeset will be added later filename=filename, changing_geometry_per_part=changing_geometry_per_part) else: raise EnsightReaderError("Unsupported model definition (note: fs is not supported)", fp, lineno) # note: measured, match, boundary geometries are not supported elif current_section == "VARIABLE": try: variable_type_, variable_location_ = key.split(" per ") # type: ignore[union-attr] if variable_type_ == "constant" and variable_location_.startswith("case"): constant_variable, ts = EnsightConstantVariable.from_casefile_line(key, values, casefile_dir_path) # type: ignore[arg-type] constant_variables[constant_variable.variable_name] = constant_variable variables_ts[constant_variable.variable_name] = ts continue variable_type = VariableType(variable_type_) variable_location = VariableLocation(variable_location_) except ValueError: print(f"Warning: unsupported variable line ({casefile_path}:{lineno}), skipping") continue filename = values[-1] filename = strip_quotes(filename) description = values[-2] ts = None if len(values) == 3: ts = int(values[0]) elif len(values) > 3: print(f"Warning: unsupported variable line ({casefile_path}:{lineno}), skipping") continue if "*" in filename and ts is None: corrected_line = f"{key}: 1 {' '.join(values)}" warnings.warn(EnsightReaderWarning( f"Variable {description} looks transient, but no timeset is given (did you mean: '{corrected_line}'?)", fp, lineno )) variables[description] = EnsightVariableFileSet(casefile_dir_path, timeset=None, # timeset will be added later variable_location=variable_location, variable_type=variable_type, variable_name=description, filename=filename, geometry_model=dummy_geometry_model) # added later variables_ts[description] = ts elif current_section == "TIME": if key == "time set": ts = int(values[0]) description = None if len(values) == 2: description = values[1] timesets[ts] = current_timeset = Timeset( timeset_id=ts, description=description, number_of_steps=-1, filename_numbers=[], time_values=[]) current_timeset_file_start_number = None current_timeset_filename_increment = None elif key == "number of steps": current_timeset.number_of_steps = int(values[0]) # type: ignore[union-attr] elif key == "filename start number": current_timeset_file_start_number = int(values[0]) if current_timeset_file_start_number is not None and current_timeset_filename_increment is not None: assert current_timeset is not None, "'filename start number' line should come after 'time set' line" current_timeset.filename_numbers = Timeset.filename_numbers_from_arithmetic_sequence( file_start_number=current_timeset_file_start_number, number_of_steps=current_timeset.number_of_steps, filename_increment=current_timeset_filename_increment ) elif key == "filename increment": current_timeset_filename_increment = int(values[0]) if current_timeset_file_start_number is not None and current_timeset_filename_increment is not None: current_timeset.filename_numbers = Timeset.filename_numbers_from_arithmetic_sequence( # type: ignore[union-attr] file_start_number=current_timeset_file_start_number, number_of_steps=current_timeset.number_of_steps, # type: ignore[union-attr] filename_increment=current_timeset_filename_increment ) elif key == "time values": current_timeset.time_values.extend(map(float, values)) # type: ignore[union-attr] elif key == "filename numbers": current_timeset.filename_numbers.extend(map(int, values)) # type: ignore[union-attr] elif key == "filename numbers file": path = op.join(casefile_dir_path, values[0]) path = strip_quotes(path) current_timeset.filename_numbers = read_numbers_from_text_file(path, int) # type: ignore[union-attr] elif key == "time values file": path = op.join(casefile_dir_path, values[0]) path = strip_quotes(path) current_timeset.time_values = read_numbers_from_text_file(path, float) # type: ignore[union-attr] elif key is None: if last_key == "time values": current_timeset.time_values.extend(map(float, values)) # type: ignore[union-attr] elif last_key == "filename numbers": current_timeset.filename_numbers.extend(map(int, values)) # type: ignore[union-attr] else: print(f"Warning: unsupported time line ({casefile_path}:{lineno}), skipping") continue else: print(f"Warning: unsupported time line ({casefile_path}:{lineno}), skipping") continue else: print(f"Warning: unsupported section ({casefile_path}:{lineno}), skipping") continue if geometry_model is None: raise EnsightReaderError("No model defined in casefile", fp) # propagate timesets to geometry and variables default_ts = min(timesets.keys(), default=None) if geometry_model_ts is None and "*" in geometry_model.filename and default_ts is not None: warnings.warn(EnsightReaderWarning( f"Geometry model looks transient, but no timeset is given - assuming it uses timeset {default_ts}", file_path=casefile_path )) geometry_model_ts = default_ts if geometry_model_ts is not None: geometry_model.timeset = timesets[geometry_model_ts] for variable_name, variable_ts in variables_ts.items(): if variable_name in variables: variable = variables[variable_name] if variable_ts is None and "*" in variable.filename: warnings.warn(EnsightReaderWarning( f"Variable {variable_name} looks transient, but no timeset is given - assuming it uses timeset {default_ts}", file_path=casefile_path )) variable_ts = default_ts if variable_ts is not None: variable.timeset = timesets[variable_ts] elif variable_name in constant_variables: constant_variable = constant_variables[variable_name] if variable_ts is not None: constant_variable.timeset = timesets[variable_ts] # propagate geometry model to variables for variable in variables.values(): variable.geometry_model = geometry_model return cls( casefile_path=str(casefile_path), geometry_model=geometry_model, variables=variables, constant_variables=constant_variables, timesets=timesets, )
[docs] @classmethod def create_empty_case(cls, path: Union[str, os.PathLike[str]], geofile_name: Optional[str] = None) -> "EnsightCaseFile": """ Create new .case file and geometry file This creates empty non-transient case with no parts or variables. Args: path: Path to .case file to be created geofile_name: If given, this filename will be used for newly created geometry file. Otherwise the filename is derived from casename, replacing ``.case`` with ``.geo``. Returns: `EnsightCaseFile` instance """ if geofile_name is None: geofile_name = f"{op.splitext(op.basename(path))[0]}.geo" geofile_path = op.join(op.dirname(path), geofile_name) with open(path, "w") as fp: print("FORMAT", file=fp) print("type: ensight gold", file=fp) print(file=fp) print("GEOMETRY", file=fp) print(f"model: {geofile_name}", file=fp) with open(geofile_path, "wb") as fp: EnsightGeometryFile.write_header(fp) return cls.from_file(path)
[docs] def to_string(self) -> str: """ Return .case file contents as a string This method is useful if you wish to modify the case, eg. add/remove variables, apply offset to time values, etc. .. note:: This method works by serializing the internal representation, meaning that any lines in the original .case file that were skipped due to missing support from the library (as well as comments) will not appear in the output at all. """ case_lines = [ "FORMAT", "type: ensight gold", "", ] # model case_lines.append("GEOMETRY") model_line = ["model:"] if self.geometry_model.timeset: model_line.append(str(self.geometry_model.timeset.timeset_id)) model_line.append(self.geometry_model.filename) if self.geometry_model.changing_geometry_per_part: model_line.append("changing_geometry_per_part") case_lines.append(" ".join(model_line)) case_lines.append("") # variables if self.variables: case_lines.append("VARIABLE") for constant_variable in self.constant_variables.values(): variable_line = ["constant per case:"] if constant_variable.timeset: variable_line.append(str(constant_variable.timeset.timeset_id)) variable_line.append(constant_variable.variable_name) variable_line.append(" ".join(f"{x:g}" for x in constant_variable.values)) case_lines.append(" ".join(variable_line)) for variable in self.variables.values(): variable_line = [f"{variable.variable_type} per {variable.variable_location}:"] if variable.timeset: variable_line.append(str(variable.timeset.timeset_id)) variable_line.append(variable.variable_name) variable_line.append(variable.filename) case_lines.append(" ".join(variable_line)) case_lines.append("") # time if self.timesets: case_lines.append("TIME") for timeset in self.timesets.values(): case_lines.append(f"time set: {timeset.timeset_id}{' '+timeset.description if timeset.description else ''}") case_lines.append(f"number of steps: {timeset.number_of_steps}") if len(timeset.filename_numbers) >= 2: timeset_file_start_number = timeset.filename_numbers[0] timeset_filename_increment = timeset.filename_numbers[1] - timeset.filename_numbers[0] else: timeset_file_start_number = 0 timeset_filename_increment = 1 if timeset.filename_numbers == Timeset.filename_numbers_from_arithmetic_sequence( file_start_number=timeset_file_start_number, number_of_steps=timeset.number_of_steps, filename_increment=timeset_filename_increment ): case_lines.append(f"filename start number: {timeset_file_start_number}") case_lines.append(f"filename increment: {timeset_filename_increment}") else: case_lines.append("filename numbers:") for i in range(0, len(timeset.filename_numbers), 6): case_lines.append(" ".join(f"{x}" for x in timeset.filename_numbers[i:i+6])) case_lines.append("time values:") for i in range(0, len(timeset.time_values), 6): case_lines.append(" ".join(f"{x:g}" for x in timeset.time_values[i:i+6])) return "\n".join(case_lines)
[docs] def to_file(self, casefile_path: Union[str, os.PathLike[str]]) -> None: """ Write EnSight Gold case to file See `EnsightCaseFile.to_string()` for more. Args: casefile_path: path to the ``*.case`` file """ text = self.to_string() with open(casefile_path, "w") as fp: fp.write(text)
[docs] def append_part_geometry( self, source_case: "EnsightCaseFile", parts: List[GeometryPart], timestep: int = 0 ) -> None: """ Append parts from different case to this case The parts must have names that do not occur in this case. Part IDs will be assigned automatically so that they do not collide with existing parts. Variable data is not copied. See Also: `EnsightCaseFile.copy_part_variables()` Usage: >>> import ensightreader >>> >>> source_case = ensightreader.read_case("source.case") >>> dest_case = ensightreader.read_case("dest.case") >>> >>> source_geo = source_case.get_geometry_model() >>> dest_geo = dest_case.get_geometry_model() >>> >>> source_part = source_geo.get_part_by_name("my_part") >>> dest_case.append_part_geometry(source_case, [source_part]) Args: source_case: Case with geometry to be copied parts: Which `GeometryPart` to copy timestep: Which timestep to use (only set for transient geometry) """ source_geo = source_case.get_geometry_model(timestep) dest_geo = self.get_geometry_model(timestep) highest_dest_part_id = max(dest_geo.parts.keys(), default=0) for source_part in parts: if dest_geo.get_part_by_name(source_part.part_name) is not None: raise ValueError(f"Part with name {source_part.part_name!r} already present in destination case") with source_geo.mmap() as source_mm, dest_geo.open_writeable() as dest_fp: for i, source_part in enumerate(parts, 1): source_part.write_part( source_mm, dest_fp, out_node_id_handling=dest_geo.node_id_handling, out_element_id_handling=dest_geo.element_id_handling, out_part_id=highest_dest_part_id+i ) dest_geo.reload_from_file()
[docs] def copy_part_variables( self, source_case: "EnsightCaseFile", parts: List[GeometryPart], variables: List[str], timestep: int = 0, output_filenames: Optional[List[str]] = None, ) -> None: """ Copy given variables for given parts from different case Parts must exist in both cases (use `EnsightCaseFile.append_part_geometry()` if needed). Variables may not exist in this (destination) case, they will be created as necessary. Variables must be defined for all source parts. Usage: >>> import ensightreader >>> >>> source_case = ensightreader.read_case("source.case") >>> dest_case = ensightreader.read_case("dest.case") >>> >>> source_geo = source_case.get_geometry_model() >>> dest_geo = dest_case.get_geometry_model() >>> >>> source_part = source_geo.get_part_by_name("my_part") >>> dest_case.append_part_geometry(source_case, [source_part]) >>> dest_case.copy_part_variables(source_case, [source_part], ["velocity", "pressure"]) Args: source_case: Case to copy variable data from parts: List of `GeometryPart` for the parts (from this case or source case) variables: Names of variables to be copied timestep: Timestep number (note: creating transient variables is currently not supported; they must exist in both cases for this to work) output_filenames: If given, it must be a list of the same length as variables. When i-th variable file does not exist, its filename will be ``output_filenames[i]`` (default: use the same name as source variable filename). See Also: `EnsightCaseFile.append_part_geometry()` """ source_geo = source_case.get_geometry_model(timestep) dest_geo = self.get_geometry_model(timestep) if output_filenames is not None and len(output_filenames) != len(variables): raise ValueError("output_filenames and variables lists must have the same length") for i, variable_name in enumerate(variables): source_variable = source_case.get_variable(variable_name, timestep) if variable_name in self.variables: dest_variable = self.get_variable(variable_name, timestep) else: if timestep != 0: raise ValueError("Creating transient variables from scratch is not implemented") if output_filenames is None: output_filename = op.basename(source_variable.file_path) else: output_filename = output_filenames[i] dest_variable = self.define_variable( source_variable.variable_location, source_variable.variable_type, variable_name, output_filename ) # [*] first, append any undefined parts to the variable file (this cannot be done via mmap_writable!) with dest_variable.open_writeable() as dest_fp: for tmp_part in parts: dest_part = dest_geo.get_part_by_name(tmp_part.part_name) assert dest_part is not None dest_variable.ensure_data_for_part(dest_fp, dest_part.part_id) dest_variable.reload_from_file() with source_variable.mmap() as source_mm, dest_variable.mmap_writable() as dest_mm: for tmp_part in parts: source_part = source_geo.get_part_by_name(tmp_part.part_name) dest_part = dest_geo.get_part_by_name(tmp_part.part_name) assert source_part is not None assert dest_part is not None # all parts are guaranteed to exist in the variable file, per [*] if dest_variable.variable_location == VariableLocation.PER_NODE: arr = dest_variable.read_node_data(dest_mm, dest_part.part_id) assert arr is not None arr[:] = source_variable.read_node_data(source_mm, source_part.part_id) elif dest_variable.variable_location == VariableLocation.PER_ELEMENT: for block in dest_part.element_blocks: arr = dest_variable.read_element_data(dest_mm, dest_part.part_id, block.element_type) assert arr is not None arr[:] = source_variable.read_element_data(source_mm, source_part.part_id, block.element_type) else: raise NotImplementedError("unexpected variable location")
[docs] def define_variable( self, variable_location: VariableLocation, variable_type: VariableType, variable_name: str, filename: str, ) -> EnsightVariableFile: """ Add new variable to case and save updated .case file This will add new non-transient variable that is not defined for any part. Usage: >>> from ensightreader import read_case, VariableLocation, VariableType >>> case = read_case("sphere.case") >>> variable = case.define_variable(VariableLocation.PER_NODE, VariableType.VECTOR, "velocity", "velocity.bin") Args: variable_location: where the variable is defined (elements or nodes) variable_type: type of the variable (scalar, ...) variable_name: name of the variable ('description' field in casefile) filename: path to the data file Returns: `EnsightVariableFile` for the new variable See Also: `EnsightVariableFile.ensure_data_for_all_parts()` """ if variable_name in self.variables: raise ValueError(f"Variable with name {variable_name!r} is already present in case") if "*" in filename: raise ValueError("Variable filename must not contain **** placeholders (defining transient variables is currently not supported)") casefile_dir_path = op.dirname(self.casefile_path) path = op.join(casefile_dir_path, filename) variable = EnsightVariableFile( file_path=path, description_line=variable_name, variable_name=variable_name, variable_type=variable_type, variable_location=variable_location, geometry_file=self.get_geometry_model(), part_offsets={}, part_element_offsets={}, part_per_element_undefined_values={}, part_per_node_undefined_values={}, ) with open(variable.file_path, "w"): pass # touch the file with variable.open_writeable() as fp: variable.write_header(fp, variable.description_line) variable_fileset = EnsightVariableFileSet( casefile_dir_path=casefile_dir_path, timeset=None, variable_location=variable_location, variable_type=variable_type, variable_name=variable_name, filename=filename, geometry_model=self.geometry_model, ) self.variables[variable_name] = variable_fileset self.to_file(self.casefile_path) return self.get_variable(variable_name)
[docs] def affine_transform(self, m: Float32NDArray, geometry: bool = True, variables: bool = True) -> None: """ Apply affine transformation See Also: `apply_affine_transform()` Args: m: 4x4 affine transformation matrix geometry: True if geometry nodes should be transformed variables: True if variable data should be transformed """ if geometry: self.geometry_model.visit(_AffineTransformGeometryVisitor(m)) if variables: for variable in self.variables.values(): variable.visit(_AffineTransformVariableVisitor(m))
[docs] def clear_cache(self) -> None: """ Remove caches of parsed file structure After calling this, `EnsightCaseFile.get_geometry_model()`, `EnsightCaseFile.get_variable()` will parse source files again. """ self.geometry_model._clear_cache() for variable in self.variables.values(): variable._clear_cache()
[docs] def read_case(path: Union[str, os.PathLike[str]]) -> EnsightCaseFile: """ Read EnSight Gold case This will only parse the case file, not any data files. Use the returned object to load whichever data you need. Args: path: Path to the ``*.case`` file Returns: `EnsightCaseFile` object """ return EnsightCaseFile.from_file(path)