# -*- coding: utf-8 -*-
from __future__ import absolute_import
import re
import subprocess
from functools import partial
import lxml.html
from lxml.etree import iterwalk
[docs]def merge_dicts(*dicts):
"""
>>> sorted(merge_dicts({'foo': 'bar'}, {'bar': 'baz'}).items())
[('bar', 'baz'), ('foo', 'bar')]
"""
res = {}
for d in dicts:
res.update(d)
return res
[docs]def get_combined_keys(dicts):
"""
>>> sorted(get_combined_keys([{'foo': 'egg'}, {'bar': 'spam'}]))
['bar', 'foo']
"""
seen_keys = set()
for dct in dicts:
seen_keys.update(dct.keys())
return seen_keys
[docs]def tostr(val):
if isinstance(val, basestring):
return val
if isinstance(val, bool):
return str(int(val))
return str(val)
[docs]def flatten(x):
"""flatten(sequence) -> list
Return a single, flat list which contains all elements retrieved
from the sequence and all recursively contained sub-sequences
(iterables).
Examples::
>>> [1, 2, [3,4], (5,6)]
[1, 2, [3, 4], (5, 6)]
>>> flatten([[[1,2,3], (42,None)], [4,5], [6], 7, (8,9,10)])
[1, 2, 3, 42, None, 4, 5, 6, 7, 8, 9, 10]
"""
result = []
for el in x:
if hasattr(el, "__iter__"):
result.extend(flatten(el))
else:
result.append(el)
return result
EXTRA_SPACE_BEFORE_RE = re.compile(r' ([,:;.!?"\)])')
EXTRA_SPACE_AFTER_RE = re.compile(r'([\(]) ')
[docs]def smart_join(tokens):
"""
Join tokens without adding unneeded spaces before punctuation::
>>> smart_join(['Hello', ',', 'world', '!'])
'Hello, world!'
>>> smart_join(['(', '303', ')', '444-7777'])
'(303) 444-7777'
"""
text = " ".join(tokens)
text = EXTRA_SPACE_BEFORE_RE.sub(r"\1", text)
text = EXTRA_SPACE_AFTER_RE.sub(r"\1", text)
return text
[docs]def html_document_fromstring(data, encoding=None):
""" Load HTML document from string using lxml.html.HTMLParser """
parser = lxml.html.HTMLParser(encoding=encoding)
return lxml.html.document_fromstring(data, parser=parser)
[docs]def run_command(args, verbose=True):
"""
Execute a command in a subprocess, terminate it if exception occurs,
raise CalledProcessError exception if command returned non-zero exit code.
If ``verbose == True`` then print output as it appears using "print".
Unlike ``subprocess.check_call`` it doesn't assume that stdout
has a file descriptor - this allows printing to works in IPython notebook.
"""
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = []
try:
while True:
line = p.stdout.readline()
if not line:
break
if verbose:
print(line.rstrip("\n\r"))
output.append(line)
p.wait()
if p.returncode != 0:
cmd = subprocess.list2cmdline(args)
raise subprocess.CalledProcessError(p.returncode, cmd, "\n".join(output))
finally:
# kill a process if exception occurs
if p.returncode is None:
p.terminate()
[docs]def alphanum_key(s):
""" Key func for sorting strings according to numerical value. """
return [int(c) if c.isdigit() else c for c in re.split('([0-9]+)', s)]
human_sorted = partial(sorted, key=alphanum_key)
human_sorted.__doc__ = "``sorted`` that uses :func:`alphanum_key` as a key function"
[docs]class BestMatch(object):
"""
Class for finding best non-overlapping matches in a sequence of tokens.
Override :meth:`get_sorted_ranges` method to define which results are best.
"""
def __init__(self, known):
self.known = known
if hasattr(known, 'iterkeys'):
keys_iter = known.iterkeys()
else:
keys_iter = known
self.max_length = max(len(key.split()) for key in keys_iter)
[docs] def find_ranges(self, tokens):
ranges = self._find_matches(tokens)
ranges = self._remove_overlapping(ranges, tokens)
return sorted(ranges) # sort by position
[docs] def get_sorted_ranges(self, ranges, tokens):
raise NotImplementedError()
def _find_matches(self, tokens):
# find all matching ranges
res = []
i = 0
while i < len(tokens):
max_length = min(self.max_length, max(len(tokens)-i, 0))
for length in reversed(range(1, max_length+1)):
lookup = " ".join(tokens[i:i+length])
if lookup in self.known:
res.append((i, length+i, lookup))
break
i += 1
return res
def _remove_overlapping(self, ranges, tokens):
# remove overlapping sequences, keeping the best
res = []
filled_indices = set()
for begin, end, lookup in self.get_sorted_ranges(ranges, tokens):
indices = set(range(begin, end))
if not indices & filled_indices:
res.append((begin, end, lookup))
filled_indices |= indices
return res
[docs]class LongestMatch(BestMatch):
"""
Class for finding longest non-overlapping matches in a sequence of tokens.
>>> known = {'North Las', 'North Las Vegas', 'North Pole', 'Vegas USA', 'Las Vegas', 'USA', "Toronto"}
>>> lm = LongestMatch(known)
>>> lm.max_length
3
>>> tokens = ["Toronto", "to", "North", "Las", "Vegas", "USA"]
>>> for start, end, matched_text in lm.find_ranges(tokens):
... print(start, end, tokens[start:end], matched_text)
(0, 1, ['Toronto'], 'Toronto')
(2, 5, ['North', 'Las', 'Vegas'], 'North Las Vegas')
(5, 6, ['USA'], 'USA')
"""
[docs] def get_sorted_ranges(self, ranges, tokens):
return sorted(ranges, key=lambda k: k[1]-k[0], reverse=True)
[docs]def substrings(txt, min_length=2, max_length=10, pad=''):
"""
>>> substrings("abc", 1)
['a', 'ab', 'abc', 'b', 'bc', 'c']
>>> substrings("abc", 2)
['ab', 'abc', 'bc']
>>> substrings("abc", 1, 2)
['a', 'ab', 'b', 'bc', 'c']
>>> substrings("abc", 1, 3, '$')
['$a', 'a', '$ab', 'ab', '$abc', 'abc', 'abc$', 'b', 'bc', 'bc$', 'c', 'c$']
"""
res = []
for start in range(len(txt)):
remaining_length = len(txt) - start
for length in range(min_length, min(max_length+1, remaining_length+1)):
token = txt[start:start+length]
if start == 0 and pad:
res.append(pad+token)
res.append(token)
if length == remaining_length and pad:
res.append(token+pad)
return res