From 8c6ef33bbc5dae51f2be128c9fa4a4b675326e7d Mon Sep 17 00:00:00 2001 From: Andy Sigler Date: Tue, 7 Feb 2017 14:10:54 -0500 Subject: [PATCH] Pipette transfer (#151) * len(WellSeries) returns length of Wells contained * adds transfer() command and helper methods * adds gradient option to transfer * adds tests to Pipette.transfer() * fixes bug when aspirate more volume than max_volume * adds helper methods to helpers.py * pylama * adds transfer test for linear gradient * pylama * returns tip if no trash attached to pipette * transfer source/targets can take Placeables with children to iterate through * adding special case where if using multi-channel and accessing WellSeries in .transfer() * pylama * tests multichannel transfer with WellSeries * changes kwarg defaults and key names * fixes merge conflicts * fixes tests * adds tests * adds test * handles Containers of length=1 * refactors transfer to include consolidate and distribute commands * pylama * adds tests for consolidate * adds docstings * adds more docstrings, comments * adds comments and organizes a bit * adds mix_after and mix_before * pylama * moves multichannel special case to helper method * adds coverage to test * removes smoothie config file from repo * containers with lenght=1 return list [container[0]] * distribute/consolidate can take only 1 or 0 tips * setting kwarg defaults inside consolidate/distribute methods * does not drop tip if tips=0 * updates tests * try and avoid small volumes occuring during carryover * adds tests for carryover volumes * pylama * adds test * uses kwargs touch_tip and blow_out * adds support for container slices to accept strings for start and stop * adds Placeable.get_children_from_slice() * uses new method * add Placeable.chain() and Placeable.group() * comment * Placeable.chain() length defaults to length of container * Placeable.well() return single well; Placeable.wells() returns list of wells * refactors Placeable.get_name(); adds test for comparing WellSeries * methods return WellSeries instead of list * undoes Placeable.get_name() refactor * . * typo * removes redudant methods between Placeable and WellSeries * adds tests * adds test for slices with negative step * allows group() range() chain() to move in negative direction * refactors .chain() * add feature for using .chain() or .group() through calling Container instance * adds access to Container.wells() through call instance * pylama * cleans up parse_string a bit * adds instance methods to WellSeries * multiple parsed argument on callable instrument, oh my * adds WellSeries.crop() and WellSeries.flatten() * trim() and flatten() return new WellSeries * adding blow_out to distribute * tests Placeable.remove_child() * includes blow_out commands while distributing with extra volume * adds access to Placeable.range() through string syntax using ':' * sets kwarg trash to default to True * allows >3 args to consolidate and distribute * tips kwargs is now new_tip='once'||'never'||'always'; repeater is now repeat * adds tests * removes all features other than slicing with lists --- .gitignore | 6 +- opentrons/containers/placeable.py | 97 ++- opentrons/helpers/helpers.py | 182 +++++ opentrons/instruments/pipette.py | 295 ++++++++- tests/opentrons/containers/test_containers.py | 18 + tests/opentrons/containers/test_grid.py | 16 + tests/opentrons/containers/test_placeable.py | 44 +- tests/opentrons/helpers/test_helpers.py | 12 +- tests/opentrons/labware/test_pipette.py | 625 +++++++++++++++--- 9 files changed, 1142 insertions(+), 153 deletions(-) diff --git a/.gitignore b/.gitignore index dab1cc8dcb3..f0454f521bd 100644 --- a/.gitignore +++ b/.gitignore @@ -80,9 +80,9 @@ sample_protocol.py # Local Calibration Data calibrations/ -*/calibrations/ -smoothie-config.ini -*/smoothie-config.ini +smoothie/ +*/smoothie +*/calibrations # SDK logs *.log diff --git a/opentrons/containers/placeable.py b/opentrons/containers/placeable.py index 8d09e0f6299..e73fa2fcb52 100644 --- a/opentrons/containers/placeable.py +++ b/opentrons/containers/placeable.py @@ -86,7 +86,9 @@ def __getitem__(self, name): Returns placeable by name or index If slice is given, returns a list """ - if isinstance(name, int) or isinstance(name, slice): + if isinstance(name, slice): + return self.get_children_from_slice(name) + elif isinstance(name, int): return self.get_children_list()[name] elif isinstance(name, str): return self.get_child_by_name(name) @@ -106,10 +108,10 @@ def __str__(self): self.__class__.__name__, self.get_name()) def __iter__(self): - return iter(self.children_by_reference.keys()) + return iter(self.get_children_list()) def __len__(self): - return len(self.children_by_name) + return len(self.get_children_list()) def __bool__(self): return True @@ -236,6 +238,25 @@ def get_child_by_name(self, name): """ return self.children_by_name.get(name) + def get_index_from_name(self, name): + """ + Retrieves child's name by index + """ + return self.get_children_list().index( + self.get_child_by_name(name)) + + def get_children_from_slice(self, s): + """ + Retrieves list of children within slice + """ + if isinstance(s.start, str): + s = slice( + self.get_index_from_name(s.start), s.stop, s.step) + if isinstance(s.stop, str): + s = slice( + s.start, self.get_index_from_name(s.stop), s.step) + return WellSeries(self.get_children_list()[s]) + def has_children(self): """ Returns *True* if :Placeable: has children @@ -490,7 +511,7 @@ def transpose(self, rows): def get_wellseries(self, matrix): """ - Returns the grid as a series of WellSeries + Returns the grid as a WellSeries of WellSeries """ res = OrderedDict() for row, cells in matrix.items(): @@ -500,19 +521,19 @@ def get_wellseries(self, matrix): res[row][col] = self.children_by_name[ ''.join(reversed(cell)) ] - res[row] = WellSeries(res[row]) + res[row] = WellSeries(res[row], name=row) return WellSeries(res) @property def rows(self): """ Rows can be accessed as: - >>> plate.row[0] - >>> plate.row['1'] + >>> plate.rows[0] + >>> plate.rows['1'] Wells can be accessed as: - >>> plate.row[0][0] - >>> plate.row['1']['A'] + >>> plate.rows[0][0] + >>> plate.rows['1']['A'] """ self.calculate_grid() return self.grid @@ -544,20 +565,27 @@ def cols(self): """ return self.columns - def well(self, name): + def well(self, name=None): """ Returns well by :name: """ - return self.get_child_by_name(name) + return self.__getitem__(name) - def wells(self): + def wells(self, *args): """ - Returns all wells + Returns child Well or list of child Wells """ - return self.get_children() + new_wells = None + if len(args) > 0: + new_wells = WellSeries([self.well(n) for n in args]) + else: + new_wells = WellSeries(self.get_children_list()) + if len(new_wells) is 1: + return new_wells[0] + return new_wells -class WellSeries(Placeable): +class WellSeries(Container): """ :WellSeries: represents a series of wells to make accessing rows and columns easier. You can access @@ -568,12 +596,15 @@ class WellSeries(Placeable): Default well index can be overriden using :set_offset: """ - def __init__(self, items): - self.items = items - if isinstance(items, dict): - items = list(self.items.values()) - self.values = items + def __init__(self, wells, name=None): + if isinstance(wells, dict): + self.items = wells + self.values = list(wells.values()) + else: + self.items = {w.get_name(): w for w in wells} + self.values = wells self.offset = 0 + self.name = name def set_offset(self, offset): """ @@ -581,22 +612,30 @@ def set_offset(self, offset): """ self.offset = offset - def __iter__(self): - return iter(self.values) - def __str__(self): return ''.format( ' '.join([str(well) for well in self.values])) - def __getitem__(self, index): - if isinstance(index, str): - return self.items[index] - else: - return list(self.values)[index] - def __getattr__(self, name): # getstate/setstate are used by pickle and are not implemented by # downstream objects (Wells) therefore raise attribute error if name in ('__getstate__', '__setstate__'): raise AttributeError() return getattr(self.values[self.offset], name) + + def get_name(self): + if self.name is None: + return str(self) + return str(self.name) + + def get_name_by_instance(self, well): + for name, value in self.items.items(): + if value is well: + return name + return None + + def get_children_list(self): + return list(self.values) + + def get_child_by_name(self, name): + return self.items.get(name) diff --git a/opentrons/helpers/helpers.py b/opentrons/helpers/helpers.py index e53db309f59..60a682f3adc 100644 --- a/opentrons/helpers/helpers.py +++ b/opentrons/helpers/helpers.py @@ -1,6 +1,7 @@ import json from opentrons.util.vector import Vector +from opentrons.containers.placeable import Placeable def unpack_coordinates(coordinates): @@ -98,3 +99,184 @@ def import_calibration_file(file_name, robot): with open(file_name) as f: json_string = '\n'.join(f) import_calibration_json(json_string, robot) + + +def _get_list(n): + if not hasattr(n, '__len__') or len(n) == 0 or isinstance(n, tuple): + n = [n] + if isinstance(n, Placeable) and len(n) == 1: + n = [n[0]] + return n + + +def _create_source_target_lists(s, t, **kwargs): + s = _get_list(s) + t = _get_list(t) + mode = kwargs.get('mode', 'transfer') + if mode == 'transfer': + if len(s) != len(t): + raise RuntimeError( + 'Transfer sources/targets must be same length') + elif mode == 'distribute': + if not (len(t) >= len(s) == 1): + raise RuntimeError( + 'Distribute must have 1 source and multiple targets') + s *= len(t) + elif mode == 'consolidate': + if not (len(s) >= len(t) == 1): + raise RuntimeError( + 'Consolidate must have multiple sources and 1 target') + t *= len(s) + return (s, t) + + +def _create_volume_list(v, total, **kwargs): + + gradient = kwargs.get('gradient', None) + + if isinstance(v, tuple): + return _create_volume_gradient( + v[0], v[-1], total, gradient=gradient) + + v = _get_list(v) + t_vol = len(v) + if (t_vol < total and t_vol != 1) or t_vol > total: + raise RuntimeError( + '{0} volumes do not match with {1} transfers'.format( + t_vol, total)) + if t_vol < total: + v = [v[0]] * total + return v + + +def _create_volume_gradient(min_v, max_v, total, gradient=None): + + diff_vol = max_v - min_v + + def _map_volume(i): + nonlocal diff_vol, total + rel_x = i / (total - 1) + rel_y = gradient(rel_x) if gradient else rel_x + return (rel_y * diff_vol) + min_v + + return [_map_volume(i) for i in range(total)] + + +def _compress_for_repeater(min_vol, max_vol, plan, **kwargs): + min_vol = float(min_vol) + max_vol = float(max_vol) + mode = kwargs.get('mode', 'transfer') + if mode == 'distribute': # combine target volumes into single aspirate + return _compress_for_distribute(min_vol, max_vol, plan, **kwargs) + if mode == 'consolidate': # combine target volumes into multiple aspirates + return _compress_for_consolidate(min_vol, max_vol, plan, **kwargs) + else: + return plan + + +def _compress_for_distribute(min_vol, max_vol, plan, **kwargs): + source = plan[0]['aspirate']['location'] + a_vol = 0 + temp_dispenses = [] + new_transfer_plan = [] + + def _add(): + nonlocal a_vol, temp_dispenses, new_transfer_plan, source + + if not temp_dispenses: + return + + # distribute commands get an extra volume added + if len(temp_dispenses) > 1: + a_vol += min_vol + + new_transfer_plan.append({ + 'aspirate': { + 'location': source, 'volume': a_vol + } + }) + for d in temp_dispenses: + new_transfer_plan.append({ + 'dispense': { + 'location': d['location'], 'volume': d['volume'] + } + }) + + if min_vol and len(temp_dispenses) > 1: + new_transfer_plan.append({ + 'blow_out': {'blow_out': True} + }) + + for p in plan: + this_vol = p['aspirate']['volume'] + if this_vol + a_vol > max_vol - min_vol: + _add() + a_vol = 0 + temp_dispenses = [] + a_vol += this_vol + temp_dispenses.append(p['dispense']) + _add() + return new_transfer_plan + + +def _compress_for_consolidate(min_vol, max_vol, plan, **kwargs): + target = plan[0]['dispense']['location'] + d_vol = 0 + temp_aspirates = [] + new_transfer_plan = [] + + def _add(): + nonlocal d_vol, temp_aspirates, new_transfer_plan, target + for a in temp_aspirates: + new_transfer_plan.append({ + 'aspirate': { + 'location': a['location'], 'volume': a['volume'] + } + }) + new_transfer_plan.append({ + 'dispense': { + 'location': target, 'volume': d_vol + } + }) + d_vol = 0 + temp_aspirates = [] + + for i, p in enumerate(plan): + this_vol = p['aspirate']['volume'] + if this_vol + d_vol > max_vol: + _add() + d_vol += this_vol + temp_aspirates.append(p['aspirate']) + _add() + return new_transfer_plan + + +def _expand_for_carryover(min_vol, max_vol, plan, **kwargs): + max_vol = float(max_vol) + carryover = kwargs.get('carryover', True) + if not carryover: + return plan + new_transfer_plan = [] + for p in plan: + source = p['aspirate']['location'] + target = p['dispense']['location'] + volume = float(p['aspirate']['volume']) + while volume > max_vol * 2: + new_transfer_plan.append({ + 'aspirate': {'location': source, 'volume': max_vol}, + 'dispense': {'location': target, 'volume': max_vol} + }) + volume -= max_vol + + # try and make sure no volumes are less than min_vol + if volume > max_vol: + volume /= 2 + new_transfer_plan.append({ + 'aspirate': {'location': source, 'volume': float(volume)}, + 'dispense': {'location': target, 'volume': float(volume)} + }) + new_transfer_plan.append({ + 'aspirate': {'location': source, 'volume': float(volume)}, + 'dispense': {'location': target, 'volume': float(volume)} + }) + return new_transfer_plan diff --git a/opentrons/instruments/pipette.py b/opentrons/instruments/pipette.py index 0668335655e..673f2f13f4f 100644 --- a/opentrons/instruments/pipette.py +++ b/opentrons/instruments/pipette.py @@ -1,11 +1,12 @@ import copy +import itertools from opentrons import containers from opentrons.containers.calibrator import Calibrator -from opentrons.containers.placeable import Placeable, humanize_location +from opentrons.containers.placeable import Placeable, WellSeries +from opentrons.containers.placeable import humanize_location from opentrons.instruments.instrument import Instrument - -import itertools +from opentrons.helpers import helpers class Pipette(Instrument): @@ -601,6 +602,9 @@ def _setup(): nonlocal location nonlocal repetitions + if volume is None: + volume = self.max_volume + self._associate_placeable(location) def _do(): @@ -609,7 +613,7 @@ def _do(): pass _description = "Mixing {0} times with a volume of {1}ul".format( - repetitions, str(self.current_volume) + repetitions, self.max_volume if volume is None else volume ) self.create_command( do=_do, @@ -958,7 +962,6 @@ def _setup(): nonlocal location if not location: location = self.get_next_tip() - self.current_tip(None) if location: placeable, _ = containers.unpack_location(location) @@ -1123,46 +1126,184 @@ def _do(): enqueue=enqueue) return self - def transfer(self, volume, source, destination=None, enqueue=True): - """ - transfer + # QUEUEABLE + def distribute(self, *args, **kwargs): """ - if not isinstance(volume, (int, float, complex)): - if volume and not destination: - destination = source - source = volume - volume = None + Distribute will move a volume of liquid from a single of source + to a list of target locations. See :any:`Transfer` for details + and a full list of optional arguments. - self.aspirate(volume, source, enqueue=enqueue) - self.dispense(volume, destination, enqueue=enqueue) - return self + Returns + ------- - # QUEUEABLE - def distribute(self, volume, source, destinations, enqueue=True): + This instance of :class:`Pipette`. + + Examples + -------- + .. + >>> plate = containers.load('96-flat', 'B1') + >>> p200 = instruments.Pipette(axis='a', max_volume=200) + >>> p200.distribute(50, plate[1], plate.cols[0]) # doctest: +ELLIPSIS + """ - distribute + kwargs['mode'] = 'distribute' + kwargs['new_tip'] = kwargs.get('new_tip', 'once') + if kwargs['new_tip'] is 'always': + kwargs['new_tip'] = 'once' + kwargs['mix_after'] = (0, 0) + return self.transfer(*args, **kwargs) + + # QUEUEABLE + def consolidate(self, *args, **kwargs): """ - volume = volume or self.max_volume - fractional_volume = volume / len(destinations) + Consolidate will move a volume of liquid from a list of sources + to a single target location. See :any:`Transfer` for details + and a full list of optional arguments. - self.aspirate(volume, source, enqueue=enqueue) - for well in destinations: - self.dispense(fractional_volume, well, enqueue=enqueue) + Returns + ------- - return self + This instance of :class:`Pipette`. + + Examples + -------- + .. + >>> plate = containers.load('96-flat', 'B1') + >>> p200 = instruments.Pipette(axis='a', max_volume=200) + >>> p200.consolidate(50, plate.cols[0], plate[1]) # doctest: +ELLIPSIS + + """ + kwargs['mode'] = 'consolidate' + kwargs['new_tip'] = kwargs.get('new_tip', 'once') + if kwargs['new_tip'] is 'always': + kwargs['new_tip'] = 'once' + kwargs['mix_before'] = (0, 0) + return self.transfer(*args, **kwargs) # QUEUEABLE - def consolidate(self, volume, sources, destination, enqueue=True): + def transfer(self, volumes, of, to, **kwargs): + """ - consolidate + Transfer will move a volume of liquid from a source location(s) + to a target location(s). It is a higher-level command, incorporating + other :any:`Pipette` commands, like :any:`aspirate` and + :any:`dispense`, designed to make protocol writing easier at the + cost of specificity. + + Parameters + ---------- + volumes : number, list, or tuple + The amount of volume to remove from each `sources` :any:`Placeable` + and add to each `targets` :any:`Placeable`. If `volumes` is a list, + each volume will be used for the sources/targets at the + matching index. If `volumes` is a tuple with two elements, + like `(20, 100)`, then a list of volumes will be generated with + a linear gradient between the two volumes in the tuple. + + of : Placeable or list + Single :any:`Placeable` or list of :any:`Placeable`s, from where + liquid will be :any:`aspirate`ed from. + + to : Placeable or list + Single :any:`Placeable` or list of :any:`Placeable`s, where + liquid will be :any:`dispense`ed to. + + tips : number + The number of clean tips this transfer command will use. If 0, + no tips will be picked up nor dropped. If 1, a single tip will be + used for all commands. + + trash : boolean + If `False` (default behavior) tips will be returned to their + tip rack. If `True` and a trash container has been attached + to this `Pipette`, then the tip will be sent to the trash + container. + + touch_tip : boolean + If `True`, a :any:`touch_tip` will occur following each + :any:`aspirate` and :any:`dispense`. If set to `False` (default), + no :any:`touch_tip` will occur. + + blow_out : boolean + If `True`, a :any:`blow_out` will occur following each + :any:`dispense`, but only if the pipette has no liquid left in it. + If set to `False` (default), no :any:`blow_out` will occur. + + mix_before : tuple + Specify the number of repetitions volume to mix, and a :any:`mix` + will proceed each :any:`aspirate` during the transfer and dispense. + The tuple's values is interpreted as (repetitions, volume). + + mix_after : tuple + Specify the number of repetitions volume to mix, and a :any:`mix` + will following each :any:`dispense` during the transfer or + consolidate. The tuple's values is interpreted as + (repetitions, volume). + + carryover : boolean + If `True` (default), any `volumes` that exceed the maximum volume + of this `Pipette` will be split into multiple smaller volumes. + + repeat : boolean + (Only applicable to :any:`distribute` and :any:`consolidate`)If + `True` (default), sequential :any:`aspirate` volumes will be + combined into one tip for the purpose of saving time. If `False`, + all volumes will be transferred seperately. + + gradient : lambda + Function for calculated the curve used for gradient volumes. + When `volumes` is a tuple of length 2, it's values are used + to create a list of gradient volumes. The default curve for + this gradient is linear (lambda x: x), however a method can + be passed with the `gradient` keyword argument to create a + custom curve. + + Returns + ------- + + This instance of :class:`Pipette`. + + Examples + -------- + .. + >>> plate = containers.load('96-flat', 'B1') + >>> p200 = instruments.Pipette(axis='a', max_volume=200) + >>> p200.transfer(50, plate[0], plate[1]) # doctest: +ELLIPSIS + """ - volume = volume or self.max_volume - fractional_volume = (volume) / len(sources) - for well in sources: - self.aspirate(fractional_volume, well, enqueue=enqueue) + sources = of + targets = to + enqueue = kwargs.get('enqueue', True) + kwargs['mode'] = kwargs.get('mode', 'transfer') + transfer_plan = self._create_transfer_plan( + volumes, sources, targets, **kwargs) + + tip_options = { + 'once': 1, + 'never': 0, + 'always': float('inf') + } + tips = tip_options.get(kwargs.pop('new_tip', 'once')) + + total_transfers = len(transfer_plan) + for i, plan in enumerate(transfer_plan): + this_aspirate = plan.get('aspirate') + if this_aspirate: + vol = this_aspirate['volume'] + loc = this_aspirate['location'] + self._add_tip_during_transfer(tips, **kwargs) + self._aspirate_during_transfer(vol, loc, **kwargs) + this_dispense = plan.get('dispense') + if this_dispense: + vol = this_dispense['volume'] + loc = this_dispense['location'] + self._dispense_during_transfer(vol, loc, **kwargs) + if plan.get('blow_out'): + self.blow_out(self.trash_container, enqueue=enqueue) + tips = self._remove_tip_during_transfer( + tips, i, total_transfers, **kwargs) - self.dispense(volume, destination, enqueue=enqueue) return self # QUEUEABLE @@ -1403,6 +1544,96 @@ def _volume_percentage(self, volume): return volume / self.max_volume + def _create_transfer_plan(self, v, s, t, **kwargs): + if self.channels > 1: + # SPECIAL CASE: if using multi-channel pipette, + # and the source or target is a WellSeries + # then avoid iterating through it's Wells + s = [s] if isinstance(s, WellSeries) else s + t = [t] if isinstance(t, WellSeries) else t + + # create list of volumes, sources, and targets of equal length + s, t = helpers._create_source_target_lists(s, t, **kwargs) + total_transfers = len(t) + v = helpers._create_volume_list(v, total_transfers, **kwargs) + + # convert to array of transfer dicts + transfer_plan = [] + for i in range(total_transfers): + transfer_plan.append({ + 'aspirate': {'location': s[i], 'volume': v[i]}, + 'dispense': {'location': t[i], 'volume': v[i]} + }) + if kwargs.get('carryover', True): + transfer_plan = helpers._expand_for_carryover( + self.min_volume, self.max_volume, transfer_plan, **kwargs) + + if kwargs.get('repeat', True): + transfer_plan = helpers._compress_for_repeater( + self.min_volume, self.max_volume, transfer_plan, **kwargs) + + return transfer_plan + + def _add_tip_during_transfer(self, tips, **kwargs): + """ + Performs a :any:`pick_up_tip` when running a :any:`transfer`, + :any:`distribute`, or :any:`consolidate`. + """ + enqueue = kwargs.get('enqueue', True) + if self.has_tip_rack() and tips > 0 and not self.current_tip(): + self.pick_up_tip(enqueue=enqueue) + + def _remove_tip_during_transfer(self, tips, i, total_transfers, **kwargs): + """ + Performs a :any:`drop_tip` or :any:`return_tip` when + running a :any:`transfer`, :any:`distribute`, or :any:`consolidate`. + """ + enqueue = kwargs.get('enqueue', True) + trash = kwargs.get('trash', True) + if tips > 1 or (i + 1 == total_transfers and tips > 0): + if trash and self.trash_container: + self.drop_tip(enqueue=enqueue) + else: + self.return_tip(enqueue=enqueue) + tips -= 1 + return tips + + def _aspirate_during_transfer(self, vol, loc, **kwargs): + """ + Performs an :any:`aspirate` when running a :any:`transfer`, and + optionally a :any:`touch_tip` afterwards. + """ + enqueue = kwargs.get('enqueue', True) + should_touch_tip = kwargs.get('touch_tip', False) + rate = kwargs.get('rate', 1) + mix_before = kwargs.get('mix_before', (0, 0)) + if isinstance(mix_before, (tuple, list)): + if len(mix_before) == 2 and 0 not in mix_before: + self.mix(mix_before[0], mix_before[1], loc, enqueue=enqueue) + self.aspirate(vol, loc, rate=rate, enqueue=enqueue) + if should_touch_tip: + self.touch_tip(enqueue=enqueue) + + def _dispense_during_transfer(self, vol, loc, **kwargs): + """ + Performs a :any:`dispense` when running a :any:`transfer`, and + optionally a :any:`mix`, :any:`touch_tip`, and/or + :any:`blow_out` afterwards. + """ + enqueue = kwargs.get('enqueue', True) + should_touch_tip = kwargs.get('touch_tip', False) + mix_after = kwargs.get('mix_after', (0, 0)) + should_blow_out = kwargs.get('blow_out', False) + rate = kwargs.get('rate', 1) + self.dispense(vol, loc, rate=rate, enqueue=enqueue) + if isinstance(mix_after, (tuple, list)): + if len(mix_after) == 2 and 0 not in mix_after: + self.mix(mix_after[0], mix_after[1], enqueue=enqueue) + if should_touch_tip: + self.touch_tip(enqueue=enqueue) + if should_blow_out and self.current_volume == 0: + self.blow_out(enqueue=enqueue) + def set_speed(self, **kwargs): """ Set the speed (mm/minute) the :any:`Pipette` plunger will move diff --git a/tests/opentrons/containers/test_containers.py b/tests/opentrons/containers/test_containers.py index f4c4e850be2..8baee62dbe6 100644 --- a/tests/opentrons/containers/test_containers.py +++ b/tests/opentrons/containers/test_containers.py @@ -27,6 +27,24 @@ def test_containers_list(self): res = containers.list() self.assertEquals(len(res), 37) + def test_bad_unpack_containers(self): + self.assertRaises( + ValueError, containers.placeable.unpack_location, 1) + + def test_iterate_without_parent(self): + c = self.generate_plate(4, 2, (5, 5), (0, 0), 5) + self.assertRaises( + Exception, next, c) + + def test_remove_child(self): + c = self.generate_plate(4, 2, (5, 5), (0, 0), 5) + c.remove_child('A2') + self.assertEquals(len(c), 3) + + def test_back_container_getitem(self): + c = self.generate_plate(4, 2, (5, 5), (0, 0), 5) + self.assertRaises(TypeError, c.__getitem__, (1, 1)) + def test_iterator(self): c = self.generate_plate(4, 2, (5, 5), (0, 0), 5) res = [well.coordinates() for well in c] diff --git a/tests/opentrons/containers/test_grid.py b/tests/opentrons/containers/test_grid.py index bf9b146ad61..86b6655547b 100644 --- a/tests/opentrons/containers/test_grid.py +++ b/tests/opentrons/containers/test_grid.py @@ -27,6 +27,22 @@ def test_rows_cols(self): for well, next_well in zip(wells[:-1], wells[1:]): self.assertEqual(well, next_well) + def test_remove_child(self): + robot = Robot.get_instance() + robot.reset() + + slot = 'B1' + + plate = containers.load('96-flat', slot, 'plate') + self.assertEquals(len(robot.containers()), 1) + plate.get_parent().remove_child(plate.get_name()) + self.assertEquals(len(robot.containers()), 0) + + plate = containers.load('96-flat', slot, 'plate') + self.assertEquals(len(robot.containers()), 1) + robot.deck[slot].remove_child(plate.get_name()) + self.assertEquals(len(robot.containers()), 0) + def test_placeable(self): plate = self.plate self.assertEqual(plate.rows[0].center(plate), (11.24, 14.34, 5.25)) diff --git a/tests/opentrons/containers/test_placeable.py b/tests/opentrons/containers/test_placeable.py index 1292fd12d4b..fab52bdeeee 100644 --- a/tests/opentrons/containers/test_placeable.py +++ b/tests/opentrons/containers/test_placeable.py @@ -16,13 +16,29 @@ def generate_plate(self, wells, cols, spacing, offset, radius, height=0): for i in range(0, wells): well = Well(properties={'radius': radius, 'height': height}) row, col = divmod(i, cols) - name = chr(row + ord('A')) + str(1 + col) + name = chr(col + ord('A')) + str(1 + row) coordinates = (col * spacing[0] + offset[0], row * spacing[1] + offset[1], 0) c.add(well, name, coordinates) return c + def assertWellSeriesEqual(self, w1, w2): + if hasattr(w1, '__len__') and hasattr(w2, '__len__'): + if len(w1) != len(w2): + print(w1) + print('lengths: {} and {}'.format(len(w1), len(w2))) + print(w2) + assert False + for i in range(len(w1)): + if w1[i] != w2[i]: + print(w1) + print('lengths: {} and {}'.format(len(w1), len(w2))) + print(w2) + assert False + else: + self.assertEquals(w1, w2) + def test_get_name(self): c = self.generate_plate(4, 2, (5, 5), (0, 0), 5) expected = '' @@ -40,7 +56,7 @@ def test_iterator(self): def test_next(self): c = self.generate_plate(4, 2, (5, 5), (0, 0), 5) well = c['A1'] - expected = c.get_child_by_name('A2') + expected = c.get_child_by_name('B1') self.assertEqual(next(well), expected) @@ -48,7 +64,7 @@ def test_int_index(self): c = self.generate_plate(4, 2, (5, 5), (0, 0), 5) self.assertEqual(c[3], c.get_child_by_name('B2')) - self.assertEqual(c[1], c.get_child_by_name('A2')) + self.assertEqual(c[1], c.get_child_by_name('B1')) def test_named_well(self): deck = Deck() @@ -165,3 +181,25 @@ def test_top_bottom(self): self.assertEqual( plate['A1'].top(10), (plate['A1'], Vector(5, 5, 20))) + + def test_slice_with_strings(self): + c = self.generate_plate(96, 8, (9, 9), (16, 11), 2.5, 40) + self.assertWellSeriesEqual(c['A1':'A2'], c[0:8]) + self.assertWellSeriesEqual(c['A12':], c.rows[-1][0:]) + self.assertWellSeriesEqual(c.rows['4':'8'], c.rows[3:7]) + self.assertWellSeriesEqual(c.cols['B':'E'], c.cols[1:4]) + self.assertWellSeriesEqual(c.cols['B']['1':'7'], c.cols[1][0:6]) + + def test_wells(self): + c = self.generate_plate(96, 8, (9, 9), (16, 11), 2.5, 40) + + self.assertWellSeriesEqual(c.well(0), c[0]) + self.assertWellSeriesEqual(c.well('A2'), c['A2']) + self.assertWellSeriesEqual(c.wells(0), c[0]) + self.assertWellSeriesEqual(c.wells(), c[0:]) + + expected = [c[n] for n in ['A1', 'B2', 'C3']] + self.assertWellSeriesEqual(c.wells('A1', 'B2', 'C3'), expected) + + expected = [c.cols[0][0], c.cols[0][5]] + self.assertWellSeriesEqual(c.cols['A'].wells('1', '6'), expected) diff --git a/tests/opentrons/helpers/test_helpers.py b/tests/opentrons/helpers/test_helpers.py index c9156b7907a..3f4545fe6da 100644 --- a/tests/opentrons/helpers/test_helpers.py +++ b/tests/opentrons/helpers/test_helpers.py @@ -1,21 +1,27 @@ import unittest from opentrons.util.vector import Vector -from opentrons.helpers.helpers import break_down_travel +from opentrons.helpers import helpers +from opentrons import instruments, containers, Robot class HelpersTest(unittest.TestCase): + def setUp(self): + self.robot = Robot.reset_for_tests() + self.p200 = instruments.Pipette(axis='b', max_volume=200) + self.plate = containers.load('96-flat', 'C1') + def test_break_down_travel(self): # with 3-dimensional points p1 = Vector(0, 0, 0) p2 = Vector(10, -12, 14) - res = break_down_travel( + res = helpers.break_down_travel( p1, p2, increment=5, mode='absolute') self.assertEquals(res[-1], p2) self.assertEquals(len(res), 5) p1 = Vector(10, -12, 14) - res = break_down_travel(Vector(0, 0, 0), p1, mode='relative') + res = helpers.break_down_travel(Vector(0, 0, 0), p1, mode='relative') expected = Vector( 0.46537410754407676, -0.5584489290528921, diff --git a/tests/opentrons/labware/test_pipette.py b/tests/opentrons/labware/test_pipette.py index 5b55265e0de..83975713296 100644 --- a/tests/opentrons/labware/test_pipette.py +++ b/tests/opentrons/labware/test_pipette.py @@ -38,6 +38,14 @@ def setUp(self): self.robot.home(enqueue=False) _, _, starting_z = self.robot._driver.get_head_position()['current'] + def test_bad_volume_percentage(self): + self.assertRaises(RuntimeError, self.p200._volume_percentage, -1) + + def test_aspirate_zero_volume(self): + self.assertEquals(len(self.robot.commands()), 0) + self.p200.aspirate(0) + self.assertEquals(len(self.robot.commands()), 0) + def test_get_plunger_position(self): self.assertEquals(self.p200._get_plunger_position('top'), 0) @@ -63,6 +71,8 @@ def test_set_max_volume(self): self.p200.aspirate() self.assertEquals(self.p200.current_volume, 202) + self.assertRaises(RuntimeError, self.p200.set_max_volume, 9) + def test_calibrate_by_position_name(self): self.p200.motor.move(9) @@ -293,7 +303,14 @@ def test_aspirate_no_args(self): def test_aspirate_invalid_max_volume(self): with self.assertRaises(RuntimeWarning): self.p200.aspirate(500) - self.robot.run() + + def test_volume_percentage(self): + self.assertRaises(RuntimeError, self.p200._volume_percentage, -1) + self.assertRaises(RuntimeError, self.p200._volume_percentage, 300) + self.assertEquals(self.p200._volume_percentage(100), 0.5) + self.assertEquals(len(self.robot.get_warnings()), 0) + self.p200._volume_percentage(self.p200.min_volume / 2) + self.assertEquals(len(self.robot.get_warnings()), 1) def test_dispense(self): self.p200.aspirate(100) @@ -364,102 +381,544 @@ def test_set_speed(self): self.p200.set_speed(dispense=100) self.assertEqual(self.p200.speeds['dispense'], 100) - def test_transfer_no_volume(self): - self.p200.aspirate = mock.Mock() - self.p200.dispense = mock.Mock() - self.p200.transfer(self.plate[0], self.plate[1]) - self.robot.run() + def test_distribute(self): + self.p200.reset() + self.p200.distribute( + 30, + self.plate[0], + self.plate[1:9], + new_tip='always' # should use only 1 tip + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '190', 'Well A1'], + ['dispensing', '30', 'Well B1'], + ['dispensing', '30', 'Well C1'], + ['dispensing', '30', 'Well D1'], + ['dispensing', '30', 'Well E1'], + ['dispensing', '30', 'Well F1'], + ['dispensing', '30', 'Well G1'], + ['blow_out', 'point'], + ['aspirating', '70', 'Well A1'], + ['dispensing', '30', 'Well H1'], + ['dispensing', '30', 'Well A2'], + ['blow_out', 'point'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() - self.assertEqual( - self.p200.aspirate.mock_calls, - [mock.call.aspirate(None, self.plate[0], enqueue=True)]) - self.assertEqual( - self.p200.dispense.mock_calls, - [mock.call.dispense(None, self.plate[1], enqueue=True)]) + self.p200.reset() + self.p200.distribute( + 30, + self.plate[0], + self.plate[1:9], + new_tip='never' + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['aspirating', '190', 'Well A1'], + ['dispensing', '30', 'Well B1'], + ['dispensing', '30', 'Well C1'], + ['dispensing', '30', 'Well D1'], + ['dispensing', '30', 'Well E1'], + ['dispensing', '30', 'Well F1'], + ['dispensing', '30', 'Well G1'], + ['blow_out', 'point'], + ['aspirating', '70', 'Well A1'], + ['dispensing', '30', 'Well H1'], + ['dispensing', '30', 'Well A2'], + ['blow_out', 'point'], + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() - def test_transfer_with_volume(self): - self.p200.aspirate = mock.Mock() - self.p200.dispense = mock.Mock() - self.p200.transfer(100, self.plate[0], self.plate[1]) - self.robot.run() + self.p200.reset() + self.p200.distribute( + 30, + self.plate[0], + self.plate + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + total_dispenses = 0 + for c in self.robot.commands(): + if 'dispensing' in c.lower(): + total_dispenses += 1 + self.assertEqual(total_dispenses, 96) + self.robot.clear_commands() - self.assertEqual( - self.p200.aspirate.mock_calls, - [mock.call.aspirate(100, self.plate[0], enqueue=True)]) - self.assertEqual( - self.p200.dispense.mock_calls, - [mock.call.dispense(100, self.plate[1], enqueue=True)]) + self.p200.reset() + self.p200.distribute( + 30, + self.plate[0], + self.plate[1:9], + repeat=False, + trash=False + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '30', 'Well A1'], + ['dispensing', '30', 'Well B1'], + ['aspirating', '30', 'Well A1'], + ['dispensing', '30', 'Well C1'], + ['aspirating', '30', 'Well A1'], + ['dispensing', '30', 'Well D1'], + ['aspirating', '30', 'Well A1'], + ['dispensing', '30', 'Well E1'], + ['aspirating', '30', 'Well A1'], + ['dispensing', '30', 'Well F1'], + ['aspirating', '30', 'Well A1'], + ['dispensing', '30', 'Well G1'], + ['aspirating', '30', 'Well A1'], + ['dispensing', '30', 'Well H1'], + ['aspirating', '30', 'Well A1'], + ['dispensing', '30', 'Well A2'], + ['return'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() def test_consolidate(self): - volume = 99 - sources = [self.plate[1], self.plate[2], self.plate[3]] - destination = self.plate[0] - fractional_volume = volume / len(sources) - self.p200.aspirate = mock.Mock() - self.p200.dispense = mock.Mock() - self.p200.consolidate(volume, sources, destination) + self.p200.reset() + self.p200.consolidate( + 30, + self.plate[0:8], + self.plate['A2'], + new_tip='always' # should use only 1 tip + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '30', 'Well A1'], + ['aspirating', '30', 'Well B1'], + ['aspirating', '30', 'Well C1'], + ['aspirating', '30', 'Well D1'], + ['aspirating', '30', 'Well E1'], + ['aspirating', '30', 'Well F1'], + ['dispensing', '180', 'Well A2'], + ['aspirating', '30', 'Well G1'], + ['aspirating', '30', 'Well H1'], + ['dispensing', '60', 'Well A2'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() - self.assertEqual( - self.p200.aspirate.mock_calls, - [ - mock.call.aspirate( - fractional_volume, - self.plate[1], - enqueue=True - ), - mock.call.aspirate( - fractional_volume, - self.plate[2], - enqueue=True - ), - mock.call.aspirate( - fractional_volume, - self.plate[3], - enqueue=True - ) - ] + self.p200.reset() + self.p200.consolidate( + 30, + self.plate[0:8], + self.plate['A2'], + new_tip='never' ) - self.assertEqual( - self.p200.dispense.mock_calls, - [mock.call.dispense(volume, destination, enqueue=True)] + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['aspirating', '30', 'Well A1'], + ['aspirating', '30', 'Well B1'], + ['aspirating', '30', 'Well C1'], + ['aspirating', '30', 'Well D1'], + ['aspirating', '30', 'Well E1'], + ['aspirating', '30', 'Well F1'], + ['dispensing', '180', 'Well A2'], + ['aspirating', '30', 'Well G1'], + ['aspirating', '30', 'Well H1'], + ['dispensing', '60', 'Well A2'], + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() + + self.p200.reset() + self.p200.consolidate( + 30, + self.plate, + self.plate[0] ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + total_aspirates = 0 + for c in self.robot.commands(): + if 'aspirating' in c.lower(): + total_aspirates += 1 + self.assertEqual(total_aspirates, 96) + self.robot.clear_commands() - def test_distribute(self): - volume = 99 - destinations = [self.plate[1], self.plate[2], self.plate[3]] - fractional_volume = volume / len(destinations) + self.p200.reset() + self.p200.consolidate( + 30, + self.plate[0:8], + self.plate['A2'], + repeat=False + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '30', 'Well A1'], + ['dispensing', '30', 'Well A2'], + ['aspirating', '30', 'Well B1'], + ['dispensing', '30', 'Well A2'], + ['aspirating', '30', 'Well C1'], + ['dispensing', '30', 'Well A2'], + ['aspirating', '30', 'Well D1'], + ['dispensing', '30', 'Well A2'], + ['aspirating', '30', 'Well E1'], + ['dispensing', '30', 'Well A2'], + ['aspirating', '30', 'Well F1'], + ['dispensing', '30', 'Well A2'], + ['aspirating', '30', 'Well G1'], + ['dispensing', '30', 'Well A2'], + ['aspirating', '30', 'Well H1'], + ['dispensing', '30', 'Well A2'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() - self.p200.aspirate = mock.Mock() - self.p200.dispense = mock.Mock() - self.p200.distribute(volume, self.plate[0], destinations) + def test_transfer(self): - self.assertEqual( - self.p200.dispense.mock_calls, - [ - mock.call.dispense( - fractional_volume, - self.plate[1], - enqueue=True - ), - mock.call.dispense( - fractional_volume, - self.plate[2], - enqueue=True - ), - mock.call.dispense( - fractional_volume, - self.plate[3], - enqueue=True - ) - ] + self.p200.reset() + self.p200.transfer( + 30, + self.plate[0:8], + self.plate[1:9], + touch_tip=True, + blow_out=True, + trash=True ) - self.assertEqual( - self.p200.aspirate.mock_calls, - [ - mock.call.aspirate(volume, self.plate[0], enqueue=True) - ] + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '30', 'Well A1'], + ['touch'], + ['dispensing', '30', 'Well B1'], + ['touch'], + ['blow'], + ['aspirating', '30', 'Well B1'], + ['touch'], + ['dispensing', '30', 'Well C1'], + ['touch'], + ['blow'], + ['aspirating', '30', 'Well C1'], + ['touch'], + ['dispensing', '30', 'Well D1'], + ['touch'], + ['blow'], + ['aspirating', '30', 'Well D1'], + ['touch'], + ['dispensing', '30', 'Well E1'], + ['touch'], + ['blow'], + ['aspirating', '30', 'Well E1'], + ['touch'], + ['dispensing', '30', 'Well F1'], + ['touch'], + ['blow'], + ['aspirating', '30', 'Well F1'], + ['touch'], + ['dispensing', '30', 'Well G1'], + ['touch'], + ['blow'], + ['aspirating', '30', 'Well G1'], + ['touch'], + ['dispensing', '30', 'Well H1'], + ['touch'], + ['blow'], + ['aspirating', '30', 'Well H1'], + ['touch'], + ['dispensing', '30', 'Well A2'], + ['touch'], + ['blow'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() + + def test_transfer_volume_control(self): + + self.p200.reset() + self.p200.transfer( + 300, + self.plate[0], + self.plate[1], + touch_tip=False, + blow_out=False + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '150', 'Well A1'], + ['dispensing', '150', 'Well B1'], + ['aspirating', '150', 'Well A1'], + ['dispensing', '150', 'Well B1'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() + + self.p200.reset() + self.p200.transfer( + 598, + self.plate[0], + self.plate[1], + touch_tip=False, + blow_out=False + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '200', 'Well A1'], + ['dispensing', '200', 'Well B1'], + ['aspirating', '199', 'Well A1'], + ['dispensing', '199', 'Well B1'], + ['aspirating', '199', 'Well A1'], + ['dispensing', '199', 'Well B1'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() + + self.p200.reset() + self.assertRaises( + RuntimeWarning, + self.p200.transfer, + 300, + self.plate[0], + self.plate[1], + carryover=False + ) + self.robot.clear_commands() + + self.p200.reset() + self.p200.distribute( + (10, 80), + self.plate[0], + self.plate.rows[1], + touch_tip=False, + blow_out=False + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '160', 'Well A1'], + ['dispensing', '10', 'Well A2'], + ['dispensing', '20', 'Well B2'], + ['dispensing', '30', 'Well C2'], + ['dispensing', '40', 'Well D2'], + ['dispensing', '50', 'Well E2'], + ['blow_out', 'point'], + ['aspirating', '140', 'Well A1'], + ['dispensing', '60', 'Well F2'], + ['dispensing', '70', 'Well G2'], + ['blow_out', 'point'], + ['aspirating', '80', 'Well A1'], + ['dispensing', '80', 'Well H2'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() + + self.p200.reset() + self.p200.distribute( + (10, 80), + self.plate[0], + self.plate.rows[1], + touch_tip=False, + blow_out=False, + gradient=lambda x: 1.0 - x + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '160', 'Well A1'], + ['dispensing', '80', 'Well A2'], + ['dispensing', '70', 'Well B2'], + ['blow_out', 'point'], + ['aspirating', '190', 'Well A1'], + ['dispensing', '60', 'Well C2'], + ['dispensing', '50', 'Well D2'], + ['dispensing', '40', 'Well E2'], + ['dispensing', '30', 'Well F2'], + ['blow_out', 'point'], + ['aspirating', '40', 'Well A1'], + ['dispensing', '20', 'Well G2'], + ['dispensing', '10', 'Well H2'], + ['blow_out', 'point'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() + + def test_transfer_mix(self): + self.p200.reset() + self.p200.transfer( + 200, + self.plate[0], + self.plate[1], + mix_before=(1, 10), + mix_after=(1, 10) + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['mix', '10'], + ['aspirating', 'Well A1'], + ['dispensing'], + ['aspirating', '200', 'Well A1'], + ['dispensing', '200', 'Well B1'], + ['mix', '10'], + ['aspirating', 'Well B1'], + ['dispensing'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() + + def test_consolidate_mix(self): + self.p200.reset() + self.p200.consolidate( + 200, + self.plate[0], + self.plate[1], + mix_before=(1, 10), + mix_after=(1, 10) + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '200', 'Well A1'], + ['dispensing', '200', 'Well B1'], + ['mix', '10'], + ['aspirating', 'Well B1'], + ['dispensing'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() + + def test_distribute_mix(self): + self.p200.reset() + self.p200.distribute( + 200, + self.plate[0], + self.plate[1], + mix_before=(1, 10), + mix_after=(1, 10) ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['mix', '10'], + ['aspirating', 'Well A1'], + ['dispensing'], + ['aspirating', '200', 'Well A1'], + ['dispensing', '200', 'Well B1'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() + + def test_transfer_multichannel(self): + self.p200.reset() + self.p200.channels = 8 + self.p200.transfer( + 200, + self.plate.rows[0], + self.plate.rows[1], + touch_tip=False, + blow_out=False, + trash=False + ) + # from pprint import pprint + # print('\n\n***\n') + # pprint(self.robot.commands()) + expected = [ + ['pick'], + ['aspirating', '200', 'Well A1'], + ['dispensing', '200', 'Well A2'], + ['return'], + ['drop'] + ] + self.assertEqual(len(self.robot.commands()), len(expected)) + for i, c in enumerate(self.robot.commands()): + for s in expected[i]: + self.assertTrue(s.lower() in c.lower()) + self.robot.clear_commands() def test_touch_tip(self): self.p200.move_to = mock.Mock()