mapping.py

from __future__ import print_function

_usermap = {}

def add_mapping(watermark_value, pattern_value):
	_usermap[watermark_value] = pattern_value

class Mapping(dict):
	"""Mapping tracks equivalences from watermark values to
	pattern values."""

	def decode(self, watermark, unmapped_value):
		"""Decode watermark values into pattern values."""
		best_pv = None
		best_wv = None
		best_errs = 0
		for start in watermark.decode_start_indices():
			pv, wv, errs = self._decode(watermark, start,
							unmapped_value)
			if best_pv is None or errs < best_errs:
				best_pv = pv
				best_wv = wv
				best_errs = errs
		return best_pv, best_wv, best_errs

	def _decode(self, watermark, start, unmapped_value):
		pattern_values = []
		watermark_values = watermark.values(start)
		errs = 0
		for wv in watermark_values:
			try:
				pv = self[wv]
			except KeyError:
				try:
					pv = _usermap[wv]
				except KeyError:
					errs += 1
					pv = unmapped_value
			pattern_values.append(pv)
		return pattern_values, watermark_values, errs

class MappingSet(list):
	"""MappingSet is a set of Mapping instances.  Mapping instances
	added to MappingSet's should not be modified subsequently."""

	def add(self, m):
		for om in self:
			if m == om:
				return
		self.append(m)

def reconcile(initial, additional):
	"""Reconcile to sets of mappings.

	Do an all-by-all check to see whether two mappings define
	different key-value pairs.  If so, they are incompatible
	and no new mapping is generated; otherwise, combine the
	key-value pairs from the two mappings into a single mapping
	and add to new set."""

	if not initial:
		# If initial set is empty, then just return additions
		return additional
	new_mappings = MappingSet()
	for im in initial:
		for am in additional:
			new = _reconcile(im, am)
			if new:
				new_mappings.add(new)
	if not new_mappings:
		print("initial", len(initial))
		print("additional", len(additional))
		raise ValueError("incompatible mapping sets")
	return new_mappings

def _reconcile(im, am):
	m = Mapping()
	for key in set(im.keys() + am.keys()):
		try:
			iv = im[key]
		except KeyError:
			m[key] = am[key]
			continue
		try:
			av = am[key]
		except KeyError:
			m[key] = im[key]
			continue
		if iv != av:
			return None
		m[key] = iv
	return m

if __name__ == "__main__":
	import unittest
	class TestMapping(unittest.TestCase):

		def setUp(self):
			self.m1 = Mapping({
				"GTT": "A",
				"CGA": "B",
				"ATA": "C",
			})
			self.m2 = Mapping({
				"ATA": "C",
				"TTT": "D",
			})
			self.m3 = Mapping({
				"ATA": "X",
				"TTT": "D",
			})
			self.m4 = Mapping({
				"ATA": "C",
				"AAA": "E",
			})
			from watermark import DNAWatermark
			self.w = DNAWatermark("GTTATACGACCCGTT")

		def test_decode(self):
			pv, wv, errs = self.m1.decode(self.w, '?')
			self.assertEqual(''.join(pv), "ACB?A")
			self.assertEqual(errs, 1)

		def test_internal_reconcile(self):
			m = _reconcile(self.m1, self.m2)
			answer = Mapping({
				"GTT": "A",
				"CGA": "B",
				"ATA": "C",
				"TTT": "D",
			})
			self.assertEqual(m, answer)
			m = _reconcile(self.m1, self.m3)
			self.assertIsNone(m)

		def test_reconcile(self):
			m = reconcile(MappingSet([self.m1]),
				MappingSet([self.m2, self.m3, self.m4]))
			self.assertEqual(len(m), 2)
	unittest.main()