"""Envelope events"""
from __future__ import annotations
import bisect
import typing
import warnings
from scipy import integrate
from mutwo import core_constants
from mutwo import core_events
from mutwo import core_parameters
from mutwo import core_utilities
__all__ = ("Envelope", "RelativeEnvelope", "TempoEnvelope")
T = typing.TypeVar("T", bound=core_events.abc.Event)
[docs]class Envelope(
core_events.SequentialEvent,
typing.Generic[T],
class_specific_side_attribute_tuple=(
"event_to_parameter",
"event_to_curve_shape",
"value_to_parameter",
"parameter_to_value",
"apply_parameter_on_event",
"apply_curve_shape_on_event",
"default_event_class",
"initialise_default_event_class",
),
):
"""Model continuous changing values (e.g. glissandi, crescendo).
:param event_iterable_or_point_sequence: An iterable filled with events
or with points. If the sequence is filled with points, the points
will be converted to events. Each event represents a point in a
two dimensional graph where the x-axis presents time and the y-axis
a changing value. Any event class can be used. It is
more important that the used event classes fit with the functions
passed in the following parameters.
:type event_iterable_or_point_sequence: typing.Iterable[T]
:param event_to_parameter: A function which receives an event and has to
return a parameter object (any object). By default the function will
ask the event for its `value` property. If the property can't be found
it will return 0.
:type event_to_parameter: typing.Callable[[core_events.abc.Event], core_constants.ParameterType]
:param event_to_curve_shape: A function which receives an event and has
to return a curve_shape. A curve_shape is either a float, an integer
or a fraction. For a curve_shape = 0 a linear transition between two
points is created. For a curve_shape > 0 the envelope changes slower
at the beginning and faster at the end, for a curve_shape < 0 it is
the inverse behaviour. The default function will ask the event for its
`curve_shape` property. If the property can't be found
it will return 0.
:type event_to_curve_shape: typing.Callable[[core_events.abc.Event], CurveShape]
:param parameter_to_value: Convert a parameter to a value. A value is any
object which supports mathematical operations.
:type parameter_to_value: typing.Callable[[Value], core_constants.ParameterType]
:param value_to_parameter: A callable object which converts a value to a parameter.
:type value_to_parameter: typing.Callable[[Value], core_constants.ParameterType]
:param apply_parameter_on_event: A callable object which applies a parameter on an event.
:type apply_parameter_on_event: typing.Callable[[core_events.abc.Event, core_constants.ParameterType], None]
:param apply_curve_shape_on_event: A callable object which applies a curve shape on an event.
:type apply_curve_shape_on_event: typing.Callable[[core_events.abc.Event, CurveShape], None]
:param default_event_class: The default event class which describes a point.
:type default_event_class: type[core_events.abc.Event]
:param initialise_default_event_class:
:type initialise_default_event_class: typing.Callable[[type[core_events.abc.Event], core_constants.DurationType], core_events.abc.Event]
This class is inspired by Marc Evansteins `Envelope` class in his
`expenvelope <https://git.sr.ht/~marcevanstein/expenvelope>`_
python package and is made to fit better into the `mutwo` ecosystem.
**Example:**
>>> from mutwo import core_events
>>> core_events.Envelope([[0, 0, 1], [0.5, 1]])
Envelope([SimpleEvent(curve_shape = 1, duration = 0.5, value = 0), SimpleEvent(curve_shape = 0, duration = 0.0, value = 1)])
"""
# Type definitions
Value = core_constants.Real
CurveShape = core_constants.Real
IncompletePoint = tuple[core_constants.DurationType, core_constants.ParameterType]
CompletePoint = tuple[
core_constants.DurationType, core_constants.ParameterType, CurveShape # type: ignore
]
Point = typing.Union[CompletePoint, IncompletePoint]
def __init__(
self,
event_iterable_or_point_sequence: typing.Union[
typing.Iterable[T], typing.Sequence[Point]
],
tempo_envelope: typing.Optional[core_events.TempoEnvelope] = None,
event_to_parameter: typing.Callable[
[core_events.abc.Event], core_constants.ParameterType
] = lambda event: getattr(
event, core_events.configurations.DEFAULT_PARAMETER_ATTRIBUTE_NAME
)
if hasattr(event, core_events.configurations.DEFAULT_PARAMETER_ATTRIBUTE_NAME)
else 0,
event_to_curve_shape: typing.Callable[
[core_events.abc.Event], CurveShape
] = lambda event: getattr(
event, core_events.configurations.DEFAULT_CURVE_SHAPE_ATTRIBUTE_NAME
)
if hasattr(event, core_events.configurations.DEFAULT_CURVE_SHAPE_ATTRIBUTE_NAME)
else 0,
parameter_to_value: typing.Callable[
[Value], core_constants.ParameterType
] = lambda parameter: parameter,
value_to_parameter: typing.Callable[
[Value], core_constants.ParameterType
] = lambda value: value,
apply_parameter_on_event: typing.Callable[
[core_events.abc.Event, core_constants.ParameterType], None
] = lambda event, parameter: setattr(
event,
core_events.configurations.DEFAULT_PARAMETER_ATTRIBUTE_NAME,
parameter,
),
apply_curve_shape_on_event: typing.Callable[
[core_events.abc.Event, CurveShape], None
] = lambda event, curve_shape: setattr(
event,
core_events.configurations.DEFAULT_CURVE_SHAPE_ATTRIBUTE_NAME,
curve_shape,
),
default_event_class: type[core_events.abc.Event] = core_events.SimpleEvent,
initialise_default_event_class: typing.Callable[
[type[core_events.abc.Event], core_constants.DurationType],
core_events.abc.Event,
] = lambda simple_event_class, duration: simple_event_class(
duration
), # type: ignore
):
self.event_to_parameter = event_to_parameter
self.event_to_curve_shape = event_to_curve_shape
self.value_to_parameter = value_to_parameter
self.parameter_to_value = parameter_to_value
self.apply_parameter_on_event = apply_parameter_on_event
self.apply_curve_shape_on_event = apply_curve_shape_on_event
self.default_event_class = default_event_class
self.initialise_default_event_class = initialise_default_event_class
event_iterable = self._event_iterable_or_point_sequence_to_event_iterable(
event_iterable_or_point_sequence
)
super().__init__(event_iterable, tempo_envelope)
# ###################################################################### #
# public class methods #
# ###################################################################### #
[docs] @classmethod
def from_points(
cls,
*point: Point,
**kwargs,
) -> Envelope:
return cls(point, **kwargs)
# ###################################################################### #
# magic methods #
# ###################################################################### #
@typing.overload # type: ignore
def __setitem__(self, index_or_slice: int, event_or_sequence: T):
...
@typing.overload
def __setitem__(
self,
index_or_slice: slice,
event_or_sequence: typing.Union[
typing.Iterable[T], typing.Iterable[Envelope.Point]
],
):
...
def __setitem__(
self,
index_or_slice: typing.Union[int, slice],
event_or_sequence: typing.Union[
T, typing.Iterable[T], typing.Iterable[Envelope.Point]
],
):
if isinstance(index_or_slice, slice) and isinstance(
event_or_sequence, typing.Iterable
):
event_or_sequence = self._event_iterable_or_point_sequence_to_event_iterable( # type: ignore
event_or_sequence # type: ignore
)
super().__setitem__(index_or_slice, event_or_sequence) # type: ignore
# ###################################################################### #
# private static methods #
# ###################################################################### #
@staticmethod
def _point_sequence_to_corrected_point_list(
point_or_invalid_type_sequence: typing.Sequence[typing.Union[Point, typing.Any]]
) -> list[typing.Union[Envelope.CompletePoint, None]]:
corrected_point_list: list[typing.Union[Envelope.CompletePoint, None]] = []
for point in point_or_invalid_type_sequence:
point_count = len(point)
if point_count == 2:
point += (0,) # type: ignore
elif point_count != 3:
raise core_utilities.InvalidPointError(point, point_count)
corrected_point_list.append(point) # type: ignore
return corrected_point_list
# ###################################################################### #
# private methods #
# ###################################################################### #
def _make_event(self, duration, parameter, curve_shape):
event = self.initialise_default_event_class(self.default_event_class, duration)
self.apply_parameter_on_event(event, parameter)
self.apply_curve_shape_on_event(event, curve_shape)
return event
def _point_sequence_to_event_list(
self,
point_or_invalid_type_sequence: typing.Sequence[
typing.Union[Point, typing.Any]
],
) -> list[core_events.abc.Event]:
corrected_point_list = Envelope._point_sequence_to_corrected_point_list(
point_or_invalid_type_sequence
)
corrected_point_list.append(None)
event_list = []
for point0, point1 in zip(corrected_point_list, corrected_point_list[1:]):
if point0 is not None:
absolute_time0, value_or_parameter, curve_shape = point0
else:
raise TypeError("Found unexpected position of None in provided points.")
if point1:
absolute_time1 = point1[0]
assert absolute_time1 >= absolute_time0
else:
absolute_time1 = absolute_time0
duration = absolute_time1 - absolute_time0
event = self._make_event(duration, value_or_parameter, curve_shape)
event_list.append(event)
return event_list
def _event_iterable_or_point_sequence_to_event_iterable(
self,
event_iterable_or_point_sequence: typing.Union[
typing.Iterable[T], typing.Sequence[Point]
],
) -> typing.Iterable[core_events.abc.Event]:
item_type_list = [
isinstance(event_or_point, core_events.abc.Event)
for event_or_point in event_iterable_or_point_sequence
]
if all(item_type_list):
event_iterable = event_iterable_or_point_sequence
elif any(item_type_list):
raise TypeError(
"Found inconsistent iterable with mixed types. "
"Please only use events or only use points for "
"'event_iterable_or_point_sequence'. First 200 "
"characters of the problematic iterable: \n"
f"{str(event_iterable_or_point_sequence)[:200]}"
)
else:
event_iterable = self._point_sequence_to_event_list(
event_iterable_or_point_sequence # type: ignore
)
return event_iterable # type: ignore
def _event_to_value(self, event: core_events.abc.Event) -> Value:
return self.parameter_to_value(self.event_to_parameter(event))
# ###################################################################### #
# public properties #
# ###################################################################### #
@property
def parameter_tuple(self) -> tuple[core_constants.ParameterType, ...]:
return tuple(map(self.event_to_parameter, self))
@property
def value_tuple(self) -> tuple[Value, ...]:
return tuple(map(self.parameter_to_value, self.parameter_tuple))
@property
def curve_shape_tuple(self) -> tuple[CurveShape, ...]:
return tuple(map(self.event_to_curve_shape, self))
@property
def is_static(self) -> bool:
"""Return `True` if :class:`Envelope` only has one static value."""
return len(set(self.value_tuple)) <= 1
# ###################################################################### #
# public methods #
# ###################################################################### #
[docs] def value_at(
self, absolute_time: typing.Union[core_parameters.abc.Duration, typing.Any]
) -> Value:
absolute_time = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(
absolute_time
)
absolute_time_tuple = self.absolute_time_tuple
try:
use_only_first_event = absolute_time <= absolute_time_tuple[0]
except IndexError:
raise core_utilities.EmptyEnvelopeError(self, "value_at")
use_only_last_event = absolute_time >= (
# If the duration of the last event == 0 there is the danger
# of floating point errors (the value in absolute_time_tuple could
# be slightly higher than the duration of the Envelope. If this
# happens the function will raise an AssertionError, because
# "_get_index_at_from_absolute_time_tuple" will return
# "None"). With explicitly testing if the last duration
# equals 0 we can avoid this danger.
absolute_time_tuple[-1]
if self[-1].duration > 0
else self.duration
)
if use_only_first_event or use_only_last_event:
index = 0 if use_only_first_event else -1
return self._event_to_value(self[index])
event_0_index = self._get_index_at_from_absolute_time_tuple(
absolute_time, absolute_time_tuple, self.duration
)
assert event_0_index is not None
value0, value1 = (
self._event_to_value(self[event_0_index + n]) for n in range(2)
)
curve_shape = self.event_to_curve_shape(self[event_0_index])
return core_utilities.scale(
absolute_time.duration_in_floats,
absolute_time_tuple[event_0_index].duration_in_floats,
absolute_time_tuple[event_0_index + 1].duration_in_floats,
value0,
value1,
curve_shape,
)
[docs] def parameter_at(
self, absolute_time: typing.Union[core_parameters.abc.Duration, typing.Any]
) -> core_constants.ParameterType:
return self.value_to_parameter(self.value_at(absolute_time))
[docs] @core_utilities.add_copy_option
def sample_at(
self,
absolute_time: typing.Union[core_parameters.abc.Duration, typing.Any],
append_duration: typing.Union[
core_parameters.abc.Duration, typing.Any
] = core_parameters.DirectDuration(0),
) -> Envelope:
"""Discretize envelope at given time
:param absolute_time: Position in time where the envelope should
define a new event.
:type absolute_time: typing.Union[core_parameters.abc.Duration, typing.Any]
:param append_duration: In case we add a new control point after any
already defined point, the duration of this control point will be
equal to "append_duration". Default to core_parameters.DirectDuration(0)
"""
def find_duration(
absolute_time: core_parameters.abc.Duration,
absolute_time_tuple: tuple[core_parameters.abc.Duration, ...],
):
"""Find duration of new control point"""
next_event_start_index = bisect.bisect_right(
absolute_time_tuple, absolute_time
)
try:
next_event_start = absolute_time_tuple[next_event_start_index]
# In case we call "sample_at" at a position after any already
# specified point.
except IndexError:
duration_new_event = append_duration
else:
duration_new_event = next_event_start - absolute_time
return duration_new_event
def find_curve_shape(
absolute_time: core_parameters.abc.Duration,
absolute_time_tuple: tuple[core_parameters.abc.Duration, ...],
envelope_duration: core_parameters.abc.Duration,
):
"""Find curve shape of new control point"""
old_event_index = (
core_events.SequentialEvent._get_index_at_from_absolute_time_tuple(
absolute_time, absolute_time_tuple, envelope_duration
)
)
if old_event_index is not None:
old_event = self[old_event_index]
curve_shape = self.event_to_curve_shape(old_event)
curve_shape_old_event = (
(absolute_time - absolute_time_tuple[old_event_index])
/ old_event.duration
).duration_in_floats * curve_shape
curve_shape_new_event = curve_shape - curve_shape_old_event
self.apply_curve_shape_on_event(old_event, curve_shape_old_event)
else:
curve_shape_new_event = 0
return curve_shape_new_event
if not self:
raise core_utilities.EmptyEnvelopeError(self, "sample_at")
absolute_time, append_duration = (
core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object)
for unknown_object in (absolute_time, append_duration)
)
# We only add a new event in case there isn't any event yet at
# given point in time.
if absolute_time not in (absolute_time_tuple := self.absolute_time_tuple):
envelope_duration = absolute_time_tuple[-1] + self[-1].duration
event = self._make_event(
find_duration(absolute_time, absolute_time_tuple),
self.parameter_at(absolute_time),
find_curve_shape(absolute_time, absolute_time_tuple, envelope_duration),
)
try:
self.squash_in(absolute_time, event)
# This means we want to squash in at a position much
# later than any already defined event.
except core_utilities.InvalidStartValueError:
difference = absolute_time - envelope_duration
self[-1].duration += difference
self.append(event)
return self
[docs] def integrate_interval(
self, start: core_constants.DurationType, end: core_constants.DurationType
) -> float:
return integrate.quad(lambda x: self.value_at(x), start, end)[0]
[docs] def get_average_value(
self,
start: typing.Optional[
typing.Union[core_parameters.abc.Duration, typing.Any]
] = None,
end: typing.Optional[
typing.Union[core_parameters.abc.Duration, typing.Any]
] = None,
) -> Value:
if start is None:
start = core_parameters.DirectDuration(0)
if end is None:
end = self.duration
start, end = (
core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object)
for unknown_object in (start, end)
)
duration = end - start
if duration == 0:
warnings.warn(core_utilities.InvalidAverageValueStartAndEndWarning())
return self.value_at(start)
return self.integrate_interval(start, end) / duration.duration
[docs] def get_average_parameter(
self,
start: typing.Optional[core_constants.DurationType] = None,
end: typing.Optional[core_constants.DurationType] = None,
) -> core_constants.ParameterType:
return self.value_to_parameter(self.get_average_value(start, end))
[docs] @core_utilities.add_copy_option
def cut_out(
self,
start: typing.Union[core_parameters.abc.Duration, typing.Any],
end: typing.Union[core_parameters.abc.Duration, typing.Any],
) -> Envelope[T]:
start, end = (
core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object)
for unknown_object in (start, end)
)
self.sample_at(start, append_duration=end - start)
self.sample_at(end)
last_point = self.get_event_at(end)
# In case last_point.duration == 0 "get_event_at" won't return
# any object. This only happens in case
#
# end > self.duration
#
# So the new point will be appended.
if last_point is None:
last_point = self[-1]
assert last_point
cut_out_envelope = super().cut_out(start, end)
cut_out_envelope.append(last_point.set("duration", 0))
return cut_out_envelope
[docs] @core_utilities.add_copy_option
def cut_off(
self,
start: typing.Union[core_parameters.abc.Duration, typing.Any],
end: typing.Union[core_parameters.abc.Duration, typing.Any],
) -> Envelope[T]:
start, end = (
core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object)
for unknown_object in (start, end)
)
if (cut_off_duration := end - start) > 0:
# XXX: It is sufficient to find the first control point
# by simply using "parameter_at" instead of "sample_at":
# We don't need an accurate curve_shape or duration,
# because this point only exists in an infinitely short
# moment in time anyway (or better: its main function is
# to ensure that interpolation from the previous point
# to this point works as expected).
parameter_0 = self.parameter_at(start)
event_0 = self._make_event(0, parameter_0, 0)
self.sample_at(end)
self._cut_off(start, end, cut_off_duration)
self.squash_in(start, event_0)
return self
[docs]class RelativeEnvelope(Envelope, typing.Generic[T]):
__parent_doc_string = Envelope.__doc__.split("\n")[2:] # type: ignore
__after_parameter_text_index = __parent_doc_string.index("")
__doc__ = "\n".join(
["Envelope with relative durations and values / parameters.\n"]
+ __parent_doc_string[:__after_parameter_text_index]
+ [
" :param base_parameter_and_relative_parameter_to_absolute_parameter: A function",
" which runs when the :func:`resolve` is called. It expects the base parameter",
" and the relative parameter (which is extracted from the envelope events)",
" and should return an absolute parameter.",
]
+ __parent_doc_string[__after_parameter_text_index:]
+ [
" The :class:`RelativeEnvelope` adds the :func:`resolve` method",
" to the base class :class:`Envelope`.",
]
)
def __init__(
self,
*args,
base_parameter_and_relative_parameter_to_absolute_parameter: typing.Callable[
[core_constants.ParameterType, core_constants.ParameterType],
core_constants.ParameterType,
],
**kwargs,
):
self.base_parameter_and_relative_parameter_to_absolute_parameter = (
base_parameter_and_relative_parameter_to_absolute_parameter
)
super().__init__(*args, **kwargs)
[docs] def resolve(
self,
duration: typing.Union[core_parameters.abc.Duration, typing.Any],
base_parameter: core_constants.ParameterType,
resolve_envelope_class: type[Envelope] = Envelope,
) -> Envelope:
duration = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(duration)
point_list = []
try:
duration_factor = duration / self.duration
except ZeroDivisionError:
duration_factor = core_parameters.DirectDuration(0)
for absolute_time, event in zip(self.absolute_time_tuple, self):
relative_parameter = self.event_to_parameter(event)
new_parameter = (
self.base_parameter_and_relative_parameter_to_absolute_parameter(
base_parameter, relative_parameter
)
)
point = (
absolute_time * duration_factor,
new_parameter,
self.event_to_curve_shape(event),
)
point_list.append(point)
return resolve_envelope_class(point_list)
[docs]class TempoEnvelope(Envelope):
def __eq__(self, other: typing.Any):
# XXX: TempoEnvelope can't use the default '__eq__' method inherited
# from list, because this would create endless recursion
# (because every event has a TempoEnvelope, so Python would forever
# compare the TempoEnvelopes of TempoEnvelopes).
try:
return (
# XXX: Prefer lazy evaluation for better performance
# (use 'and' instead of 'all').
self.absolute_time_tuple == other.absolute_time_tuple
and self.curve_shape_tuple == other.curve_shape_tuple
and self.value_tuple == other.value_tuple
)
except AttributeError:
return False