Source code for mutwo.core_events.basic

"""Generic event classes which can be used in multiple contexts.

The different events differ in their timing structure and whether they
are nested or not:
"""

from __future__ import annotations

import bisect
import copy
import functools
import operator
import types
import typing

import ranges

from mutwo import core_constants
from mutwo import core_events
from mutwo import core_parameters
from mutwo import core_utilities


__all__ = (
    "SimpleEvent",
    "SequentialEvent",
    "SimultaneousEvent",
    "TaggedSimpleEvent",
    "TaggedSequentialEvent",
    "TaggedSimultaneousEvent",
)


[docs]class SimpleEvent(core_events.abc.Event): """Event-Object which doesn't contain other Event-Objects (the node or leaf). :param duration: The duration of the ``SimpleEvent``. Mutwo will convert the incoming object to a :class:`mutwo.core_parameters.abc.Duration` object with the global `core_events.configurations.UNKNOWN_OBJECT_TO_DURATION` callable. **Example:** >>> from mutwo import core_events >>> simple_event = core_events.SimpleEvent(2) >>> print(simple_event) SimpleEvent(duration = DirectDuration(2)) """ parameter_to_exclude_from_representation_tuple = ("tempo_envelope",) def __init__( self, duration: core_parameters.abc.Duration, tempo_envelope: typing.Optional[core_events.TempoEnvelope] = None, ): super().__init__(tempo_envelope) self.duration = duration # ###################################################################### # # magic methods # # ###################################################################### # def __eq__(self, other: typing.Any) -> bool: """Test for checking if two objects are equal.""" try: parameter_to_compare_set = set([]) for object_ in (self, other): for parameter_to_compare in object_._parameter_to_compare_tuple: parameter_to_compare_set.add(parameter_to_compare) except AttributeError: return False return core_utilities.test_if_objects_are_equal_by_parameter_tuple( self, other, tuple(parameter_to_compare_set) ) def __repr__(self) -> str: attribute_iterator = ( "{} = {}".format(attribute, getattr(self, attribute)) for attribute in self._parameter_to_print_tuple ) return "{}({})".format(type(self).__name__, ", ".join(attribute_iterator)) # ###################################################################### # # properties # # ###################################################################### # @property def _parameter_to_print_tuple(self) -> tuple[str, ...]: """Return tuple of attribute names which shall be printed for repr.""" # XXX: Fix infinite circular loop (due to 'tempo_envelope') # and avoid printing too verbose parameters. return tuple( filter( lambda attribute: attribute not in self.parameter_to_exclude_from_representation_tuple, self._parameter_to_compare_tuple, ) ) @property def _parameter_to_compare_tuple(self) -> tuple[str, ...]: """Return tuple of attribute names which values define the :class:`SimpleEvent`. The returned attribute names are used for equality check between two :class:`SimpleEvent` objects. """ return tuple( attribute for attribute in dir(self) # XXX: We have to use 'and' (lazy evaluation) instead of # 'all', to avoid redundant checks! # # no private attributes if attribute[0] != "_" # no redundant comparisons and attribute not in ("parameter_to_exclude_from_representation_tuple",) # no methods and not isinstance(getattr(self, attribute), types.MethodType) ) @property def duration(self) -> core_parameters.abc.Duration: return self._duration @duration.setter def duration(self, duration: core_parameters.abc.Duration): self._duration = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(duration) # ###################################################################### # # public methods # # ###################################################################### #
[docs] def destructive_copy(self) -> SimpleEvent: return copy.deepcopy(self)
[docs] def get_parameter( self, parameter_name: str, flat: bool = False, filter_undefined: bool = False ) -> core_constants.ParameterType: return getattr(self, parameter_name, None)
[docs] @core_utilities.add_copy_option def set_parameter( # type: ignore self, parameter_name: str, object_or_function: typing.Union[ typing.Callable[ [core_constants.ParameterType], core_constants.ParameterType ], core_constants.ParameterType, ], set_unassigned_parameter: bool = True, ) -> SimpleEvent: """Sets event parameter to new value. :param parameter_name: The name of the parameter which values shall be changed. :param object_or_function: For setting the parameter either a new value can be passed directly or a function can be passed. The function gets as an argument the previous value that has had been assigned to the respective object and has to return a new value that will be assigned to the object. :param set_unassigned_parameter: If set to ``False`` a new parameter will only be assigned to an Event if the Event already has a attribute with the respective `parameter_name`. If the Event doesn't know the attribute yet and `set_unassigned_parameter` is False, the method call will simply be ignored. :param mutate: If ``False`` the function will return a copy of the given object. If set to ``True`` the object itself will be changed and the function will return the changed object. Default to ``True``. **Example:** >>> from mutwo import core_events >>> simple_event = core_events.SimpleEvent(2) >>> simple_event.set_parameter( >>> 'duration', lambda old_duration: old_duration * 2 >>> ) >>> simple_event.duration 4 >>> simple_event.set_parameter('duration', 3) >>> simple_event.duration 3 >>> simple_event.set_parameter( >>> 'unknown_parameter', 10, set_unassigned_parameter=False >>> ) # this will be ignored >>> simple_event.unknown_parameter AttributeError: 'SimpleEvent' object has no attribute 'unknown_parameter' >>> simple_event.set_parameter( >>> 'unknown_parameter', 10, set_unassigned_parameter=True >>> ) # this will be written >>> simple_event.unknown_parameter 10 """ old_parameter = self.get_parameter(parameter_name) if set_unassigned_parameter or old_parameter is not None: if hasattr(object_or_function, "__call__"): new_parameter = object_or_function(old_parameter) else: new_parameter = object_or_function setattr(self, parameter_name, new_parameter)
[docs] @core_utilities.add_copy_option def mutate_parameter( # type: ignore self, parameter_name: str, function: typing.Union[ typing.Callable[[core_constants.ParameterType], None], typing.Any ], ) -> SimpleEvent: parameter = self.get_parameter(parameter_name) if parameter is not None: function(parameter)
[docs] def metrize(self, mutate: bool = True) -> SimpleEvent: # XXX: import in method to avoid circular import error metrized_event = __import__( "mutwo.core_converters" ).core_converters.EventToMetrizedEvent()(self) if mutate: self.duration = metrized_event.duration self.tempo_envelope = metrized_event.tempo_envelope return self else: return metrized_event
[docs] @core_utilities.add_copy_option def cut_out( # type: ignore self, start: core_parameters.abc.Duration, end: core_parameters.abc.Duration, ) -> SimpleEvent: start, end = ( core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object) for unknown_object in (start, end) ) self._assert_correct_start_and_end_values( start, end, condition=lambda start, end: start < end ) duration = self.duration difference_to_duration: core_parameters.DirectDuration = ( core_parameters.DirectDuration(0) ) if start > 0: difference_to_duration += start if end < duration: difference_to_duration += duration - end if difference_to_duration >= duration: raise core_utilities.InvalidCutOutStartAndEndValuesError( start, end, self, duration ) self.duration -= difference_to_duration
[docs] @core_utilities.add_copy_option def cut_off( # type: ignore self, start: core_parameters.abc.Duration, end: core_parameters.abc.Duration, ) -> SimpleEvent: start, end = ( core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object) for unknown_object in (start, end) ) self._assert_correct_start_and_end_values(start, end) duration = self.duration if start < duration: if end > duration: end = duration self.duration -= end - start
T = typing.TypeVar("T", bound=core_events.abc.Event)
[docs]class SequentialEvent(core_events.abc.ComplexEvent, typing.Generic[T]): """Event-Object which contains other Events which happen in a linear order.""" # ###################################################################### # # private static methods # # ###################################################################### # @staticmethod def _get_index_at_from_absolute_time_tuple( absolute_time: typing.Union[core_parameters.abc.Duration, typing.Any], absolute_time_tuple: tuple[core_constants.DurationType, ...], duration: core_constants.DurationType, ) -> typing.Optional[int]: absolute_time = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION( absolute_time ) if absolute_time < duration and absolute_time >= 0: return bisect.bisect_right(absolute_time_tuple, absolute_time) - 1 else: return None # ###################################################################### # # private methods # # ###################################################################### # # XXX: We need to have a private "_cut_off" method to simplify # overriding the public "cut_off" method in children classes # of SequentialEvent. This is necessary, because the implementation # of "squash_in" makes use of "_cut_off". In this way it is possible # to adjust the meaning of the public "cut_off" method, without # having to change the meaning of "squash_in" (this happens for instance # in the mutwo.core_events.Envelope class). def _cut_off( self, start: core_parameters.abc.Duration, end: core_parameters.abc.Duration, cut_off_duration: typing.Optional[core_parameters.abc.Duration] = None, ) -> SequentialEvent[T]: if cut_off_duration is None: cut_off_duration = end - start # Collect core_events which are only active within the # cut_off - range event_to_delete_list = [] absolute_time_tuple = self.absolute_time_tuple for event_index, event_start, event_end, event in zip( range(len(self)), absolute_time_tuple, absolute_time_tuple[1:] + (None,), self, ): if event_end is None: event_end = event_start + event.duration if event_start >= start and event_end <= end: event_to_delete_list.append(event_index) # Shorten event which are partly active within the # cut_off - range elif event_start <= start and event_end >= start: difference_to_event_start = start - event_start event.cut_off( difference_to_event_start, difference_to_event_start + cut_off_duration, ) elif event_start < end and event_end > end: difference_to_event_start = event_start - start event.cut_off(0, cut_off_duration - difference_to_event_start) for index in reversed(event_to_delete_list): del self[index] return self # ###################################################################### # # properties # # ###################################################################### # @core_events.abc.ComplexEvent.duration.getter def duration(self) -> core_parameters.abc.Duration: try: return functools.reduce(operator.add, (event.duration for event in self)) # If SequentialEvent is empty except TypeError: return core_parameters.DirectDuration(0) @property def absolute_time_tuple(self) -> tuple[core_constants.Real, ...]: """Return absolute point in time for each event.""" duration_iterator = (event.duration for event in self) return tuple( core_utilities.accumulate_from_n( duration_iterator, core_parameters.DirectDuration(0) ) )[:-1] @property def start_and_end_time_per_event( self, ) -> tuple[ranges.Range, ...]: """Return start and end time for each event.""" duration_iterator = (event.duration for event in self) absolute_time_tuple = tuple( core_utilities.accumulate_from_n( duration_iterator, core_parameters.DirectDuration(0) ) ) return tuple( ranges.Range(*start_and_end_time) for start_and_end_time in zip(absolute_time_tuple, absolute_time_tuple[1:]) ) # ###################################################################### # # public methods # # ###################################################################### #
[docs] def get_event_index_at( self, absolute_time: typing.Union[core_parameters.abc.Duration, typing.Any] ) -> typing.Optional[int]: """Get index of event which is active at the passed absolute_time. :param absolute_time: The absolute time where the method shall search for the active event. :type absolute_time: typing.Union[core_parameters.abc.Duration, typing.Any] :return: Index of event if there is any event at the requested absolute time and ``None`` if there isn't any event. **Example:** >>> from mutwo import core_events >>> sequential_event = core_events.SequentialEvent([core_events.SimpleEvent(2), core_events.SimpleEvent(3)]) >>> sequential_event.get_event_index_at(1) 0 >>> sequential_event.get_event_index_at(3) 1 >>> sequential_event.get_event_index_at(100) None **Warning:** This method ignores events with duration == 0. """ absolute_time_tuple = self.absolute_time_tuple return SequentialEvent._get_index_at_from_absolute_time_tuple( absolute_time, absolute_time_tuple, self.duration )
[docs] def get_event_at( self, absolute_time: typing.Union[core_parameters.abc.Duration, typing.Any] ) -> typing.Optional[T]: """Get event which is active at the passed absolute_time. :param absolute_time: The absolute time where the method shall search for the active event. :type absolute_time: typing.Union[core_parameters.abc.Duration, typing.Any] :return: Event if there is any event at the requested absolute time and ``None`` if there isn't any event. **Example:** >>> from mutwo import core_events >>> sequential_event = core_events.SequentialEvent([core_events.SimpleEvent(2), core_events.SimpleEvent(3)]) >>> sequential_event.get_event_at(1) SimpleEvent(duration = 2) >>> sequential_event.get_event_at(3) SimpleEvent(duration = 3) >>> sequential_event.get_event_at(100) None **Warning:** This method ignores events with duration == 0. """ event_index = self.get_event_index_at(absolute_time) if event_index is None: return None else: return self[event_index] # type: ignore
[docs] @core_utilities.add_copy_option def cut_out( # type: ignore self, start: core_constants.DurationType, end: core_constants.DurationType, ) -> SequentialEvent[T]: start, end = ( core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object) for unknown_object in (start, end) ) self._assert_correct_start_and_end_values(start, end) event_to_remove_index_list = [] for event_index, event_start, event in zip( range(len(self)), self.absolute_time_tuple, self ): event_duration = event.duration event_end = event_start + event_duration cut_out_start: core_parameters.DirectDuration = ( core_parameters.DirectDuration(0) ) cut_out_end = event_duration if event_start < start: cut_out_start += start - event_start if event_end > end: cut_out_end -= event_end - end if cut_out_start < cut_out_end: event.cut_out(cut_out_start, cut_out_end) elif not ( # XXX: Support special case of events with duration = 0. event.duration == 0 and event_start >= start and event_start <= end ): event_to_remove_index_list.append(event_index) for event_to_remove_index in reversed(event_to_remove_index_list): del self[event_to_remove_index]
[docs] @core_utilities.add_copy_option def cut_off( # type: ignore self, start: core_constants.DurationType, end: core_constants.DurationType, ) -> SequentialEvent[T]: start, end = ( core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object) for unknown_object in (start, end) ) cut_off_duration = end - start # Avoid unnecessary iterations if cut_off_duration > 0: return self._cut_off(start, end, cut_off_duration)
[docs] @core_utilities.add_copy_option def squash_in( # type: ignore self, start: typing.Union[core_parameters.abc.Duration, typing.Any], event_to_squash_in: core_events.abc.Event, ) -> SequentialEvent[T]: start = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(start) self._assert_start_in_range(start) # Only run cut_off if necessary -> Improve performance if (event_to_squash_in_duration := event_to_squash_in.duration) > 0: cut_off_end = start + event_to_squash_in_duration self._cut_off(start, cut_off_end, event_to_squash_in_duration) # We already know that the given start is within the # range of the event. This means that if the start # is bigger than the duration, it is only due to a # floating point rounding error. To avoid odd bugs # we therefore have to define the bigger-equal # relationship. if start >= self.duration: self.append(event_to_squash_in) else: absolute_time_tuple = self.absolute_time_tuple try: insert_index = absolute_time_tuple.index(start) # There is an event on the given point which need to be # split. except ValueError: active_event_index = ( SequentialEvent._get_index_at_from_absolute_time_tuple( start, absolute_time_tuple, self.duration ) ) split_position = start - absolute_time_tuple[active_event_index] if ( split_position > 0 and split_position < self[active_event_index].duration ): split_active_event = self[active_event_index].split_at( split_position ) self[active_event_index] = split_active_event[1] self.insert(active_event_index, split_active_event[0]) active_event_index += 1 insert_index = active_event_index self.insert(insert_index, event_to_squash_in)
[docs] @core_utilities.add_copy_option def split_child_at( self, absolute_time: typing.Union[core_parameters.abc.Duration, typing.Any] ) -> SequentialEvent[T]: absolute_time_tuple = self.absolute_time_tuple event_index = SequentialEvent._get_index_at_from_absolute_time_tuple( absolute_time, absolute_time_tuple, self.duration ) # If there is no event at the requested time, raise error if event_index is None: raise core_utilities.SplitUnavailableChildError(absolute_time) # Only try to split child event at the requested time if there isn't # a segregation already anyway elif absolute_time != absolute_time_tuple[event_index]: try: end = absolute_time_tuple[event_index + 1] except IndexError: end = self.duration difference = end - absolute_time first_event, second_event = self[event_index].split_at(difference) self[event_index] = first_event self.insert(event_index, second_event)
[docs]class SimultaneousEvent(core_events.abc.ComplexEvent, typing.Generic[T]): """Event-Object which contains other Event-Objects which happen at the same time.""" # ###################################################################### # # properties # # ###################################################################### # @core_events.abc.ComplexEvent.duration.getter def duration(self) -> core_constants.DurationType: try: return max(event.duration for event in self) # If SimultaneousEvent is empty except ValueError: return core_parameters.DirectDuration(0) # ###################################################################### # # public methods # # ###################################################################### #
[docs] @core_utilities.add_copy_option def cut_out( # type: ignore self, start: typing.Union[core_parameters.abc.Duration, typing.Any], end: typing.Union[core_parameters.abc.Duration, typing.Any], ) -> SimultaneousEvent[T]: start, end = ( core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object) for unknown_object in (start, end) ) self._assert_correct_start_and_end_values(start, end) [event.cut_out(start, end) for event in self]
[docs] @core_utilities.add_copy_option def cut_off( # type: ignore self, start: core_constants.DurationType, end: core_constants.DurationType, ) -> SimultaneousEvent[T]: start, end = ( core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object) for unknown_object in (start, end) ) self._assert_correct_start_and_end_values(start, end) [event.cut_off(start, end) for event in self]
[docs] @core_utilities.add_copy_option def squash_in( # type: ignore self, start: typing.Union[core_parameters.abc.Duration, typing.Any], event_to_squash_in: core_events.abc.Event, ) -> SimultaneousEvent[T]: start = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(start) self._assert_start_in_range(start) for event in self: try: event.squash_in(start, event_to_squash_in) # type: ignore # Simple events don't have a 'squash_in' method. except AttributeError: raise core_utilities.ImpossibleToSquashInError(self, event_to_squash_in)
[docs] @core_utilities.add_copy_option def split_child_at( self, absolute_time: core_constants.DurationType ) -> SimultaneousEvent[T]: for event_index, event in enumerate(self): try: event.split_child_at(absolute_time) # simple events don't have a 'split_child_at' method except AttributeError: split_event = event.split_at(absolute_time) self[event_index] = SequentialEvent(split_event)
[docs]@core_utilities.add_tag_to_class class TaggedSimpleEvent(SimpleEvent): """:class:`SimpleEvent` with tag."""
[docs]@core_utilities.add_tag_to_class class TaggedSequentialEvent( SequentialEvent, typing.Generic[T], class_specific_side_attribute_tuple=("tag",) ): """:class:`SequentialEvent` with tag."""
[docs]@core_utilities.add_tag_to_class class TaggedSimultaneousEvent( SimultaneousEvent, typing.Generic[T], class_specific_side_attribute_tuple=("tag",) ): """:class:`SimultaneousEvent` with tag."""