Source code for pycarla.midiplayer

import multiprocessing
from typing import Any, List

import mido

from .generics import JackClient


[docs]class MIDIPlayer(JackClient): MIDI_PORT = 'Carla' def __init__(self): """ Creates a player which is able to connect to a Carla instance Optionally, `freewheel` can be used to start freeewheel mode before of playing. For now, only one Carla instance should be active. """ super().__init__("MIDIPlayer")
[docs] def activate(self): """ Activate the MIDI player client and set the connections. If the Carla instance is not found, this method rase a `RuntimeWarning`. To avoid it, use ``Carla.exists`` method. Note that ``Carla.start`` already does that! """ carla_ports = self.client.get_ports(self.MIDI_PORT, is_midi=True, is_input=True) if len(carla_ports) != 1: raise RuntimeWarning( "Cannot find the Carla instance, Retry later!") self.client.activate() self.client.midi_outports.clear() self.port = self.client.midi_outports.register('out') if not self.port.is_connected_to(carla_ports[0]): self.client.connect(self.port, carla_ports[0]) self.is_active = True
[docs] def clear(self): """ clears the `_messages` list """ del self._messages self._messages = []
[docs] def synthesize_messages(self, messages: List[mido.Message], sync=False, condition=lambda: True, **kwargs): """ Synthesize a list of messages 1. Connect the port of this jack client to Carla if not yet done 2. Send the list of messages to the Carla instance If `sync` is True, this function waits until all messages have been processed, otherwise, it suddenly returns. You can wait by calling the `wait` method of this object. This function is compatible with freewheeling mode. Freewheel prevents jack from waiting between return calls. This allows for the maximum allowed speed, but not output/input operation is done with system audio (i.e. you cannot listen/recording to anything while in freewheeling mode). `condition` is a function checked in the playing callback. If `condition()` is False, no message is sent. The callback start playing at the cycle after the one in which `condition()` becomes True. `kwargs` are passed to `wait` if `sync` is True. Note: Mido numbers channels 0 to 15 instead of 1 to 16. This makes them easier to work with in Python but you may want to add and subtract 1 when communicating with the user. """ self._messages = messages global msg, offset it = iter(self._messages) msg = next(it) offset = 0 self.ready_at = -1 @self.client.set_process_callback def process(frames): if self.is_active: global offset, msg for port in self.client.midi_outports: if not self.is_ready(): print(self.client.name + " ready!") # let other clients know that this is ready self.ready_at = self.client.last_frame_time elif condition(): # start only if other clients are ready too port.clear_buffer() while True: if offset >= frames: # wait for the correct block offset -= frames return # Note: This may raise an exception: port.write_midi_event(offset, msg.bytes()) try: msg = next(it) except StopIteration: self.end_wait.set() offset += round(msg.time * self.client.samplerate) self.activate() if sync: self.wait(**kwargs)
[docs] def synthesize_midi_note(self, pitch: int, velocity: int, duration: float, sustain: int = 0, soft: int = 0, sostenuto: int = 0, channel: int = 0, program: int = 0, **kwargs) -> multiprocessing.Process: """ set up a list of messages representing one note and then calls `self.synthesize_messages`. All keywords from that method can be used here. """ messages = [ # program change mido.Message('program_change', program=program, channel=channel, time=0), # sustain mido.Message('control_change', control=64, value=sustain, channel=channel, time=0), # sostenuto mido.Message('control_change', control=66, value=sostenuto, channel=channel, time=0), # soft mido.Message('control_change', control=67, value=soft, channel=channel, time=0), # note on mido.Message('note_on', velocity=velocity, note=pitch, channel=channel, time=0), # note off mido.Message('note_off', note=pitch, channel=channel, time=duration) ] return self.synthesize_messages(messages, **kwargs)
[docs] def synthesize_midi_file(self, midifile: Any, **kwargs) -> multiprocessing.Process: """ Send midi messages contained in `filename` using `self.synthesize_messages`. All keywords from that method can be used here. `midifile` can be a `mido.MidiFile` object or a string After the playback, ports are resetted """ if type(midifile) is str: midifile = mido.MidiFile(midifile) messages = [m for m in midifile] return self.synthesize_messages(messages, **kwargs)