Source code for geoparsepy.geo_parse_lib

# !/usr/bin/env python
# -*- coding: utf-8 -*-

"""
..
	/////////////////////////////////////////////////////////////////////////
	//
	// (c) Copyright University of Southampton, 2012
	//
	// Copyright in this software belongs to University of Southampton,
	// Highfield, Southampton, SO17 1BJ, United Kingdom
	//
	// This software may not be used, sold, licensed, transferred, copied
	// or reproduced in whole or in part in any manner or form or in or
	// on any media by any person other than in accordance with the terms
	// of the Licence Agreement supplied with the software, or otherwise
	// without the prior written consent of the copyright owners.
	//
	// This software is distributed WITHOUT ANY WARRANTY, without even the
	// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
	// PURPOSE, except where stated in the Licence Agreement supplied with
	// the software.
	//
	// Created By : Stuart E. Middleton
	// Created Date : 2014/04/15
	// Created for Project: REVEAL
	//
	/////////////////////////////////////////////////////////////////////////
	//
	// Dependancies: Source code derived from original location_extraction python code in TRIDEC
	//
	/////////////////////////////////////////////////////////////////////////
	'''

Geoparsing is based on named entity matching against OpenStreetMap (OSM) locations. All locations with names that match tokens will be selected from a target text sentence. This will result in a set of OSM locations, all with a common name or name variant, for each token in the text. Geoparsing included the following features:
	* *token expansion* using location name variants (i.e. OSM multi-lingual names, short names and acronyms)
	* *token expansion* using location type variants (e.g. street, st.)
	* *token filtering* of single token location names against WordNet (non-nouns), language specific stoplists and peoples first names (nltk.corpus.names.words()) to reduce false positive matches
	* *prefix checking* when matching in case a first name prefixes a location token(s) to avoid matching peoples full names as locations (e.g. Victoria Derbyshire != Derbyshire)

Location disambiguation is the process of choosing which of a set of possible OSM locations, all with the same name, is the best match. Location disambiguation is based on an evidential approach, with evidential features detailed below in order of importance:
	* *token subsumption*, rejecting smaller phrases over larger ones (e.g. 'New York' will prefer [New York, USA] to [York, UK])
	* *nearby parent region*, preferring locations with a parent region also appearing within a semantic distance (e.g. 'New York in USA' will prefer [New York, USA] to [New York, BO, Sierra Leone])
	* *nearby locations*, perferring locations with closeby or overlapping locations within a semantic distance (e.g. 'London St and Commercial Road' will select from road name choices with the same name based on spatial proximity)
	* *nearby geotag*, perferring locations that are closeby or overlapping a geotag
	* *general before specific*, rejecting locations with a higher admin level (or no admin level at all) compared to locations with a lower admin level (e.g. 'New York' will prefer [New York, USA] to [New York, BO, Sierra Leone]

Currently the following languages are supported:
	* English, French, German, Italian, Portuguese, Russian, Ukrainian
	* All other languages will work but there will be no language specific token expansion available

This geoparsing algorithm uses a large memory footprint, proportional to the number of cached locations, to maximize matching speed. It can be naively parallelized, with multiple geoparse processes loaded with different sets of locations and the geoparse results aggregated in a last process where location disambiguation is applied. This approach has been validated across an APACHE Storm cluster.

"""

# Performance data
# - geo parse [1 process]
#   + 10,000 tweets with 1% geotag = 1720s = 5.8 items / sec
#   + 3,800 instagram each with geotag = 6702s = 0.56 items / sec
# - geo aggregate [1 process]
#   + 10,000 tweets with 1% geotag = 1642s = 6.0 items / sec
#   + 3,800 instagram each with geotag = 955s = 3.7 items / sec


import os, re, sys, copy, collections, codecs, string, configparser, traceback, datetime, time, math
import nltk, nltk.stem.porter, nltk.corpus, numpy, shapely, shapely.speedups, shapely.prepared, shapely.wkt, shapely.geometry
from nltk.util import ngrams
from nltk.corpus import wordnet
import soton_corenlppy
import pkg_resources


# enumeration for OSM types generated by calc_OSM_type()
list_osm_types = ['transport','building','admin','other']

[docs]def get_geoparse_config( **kwargs ) : """ return a geospatial config object for this specific set of languages. the config object contains an instantiated NLTK stemmer, tokenizer and settings tailored for the chosen language set. all available language specific corpus will be read into memory, such as street name variants. geoparse config settings are below: * *lower_tokens* = True, since locations are not alweays referenced in text as capitalized Proper Nouns (overrides variable keyword args) * *building_types* = dict, containing building type name variants loaded from each selected language's corpus file * *street_types* = dict, containing street type name variants loaded from each selected language's corpus file * *admin_types* = dict, containing admin region type name variants loaded from each selected language's corpus file * *gazeteers* = dict, containing local gazateer name variants not provided in the OSM database for specific OSM IDs * *use_wordnet* = True, remove 1 token location names that appear in wordnet with non location meanings | note: for a list of default config settings see soton_corenlppy.common_parse_lib.get_common_config() | note: a config object approach is used, as opposed to a global variable, to allow geo_parse_lib functions to work in a multi-threaded environment :param kwargs: variable argument to override any default config values :return: configuration settings to be used by all geo_parse_lib functions :rtype: dict """ dictArgs = copy.copy( kwargs ) # default corpus directory is where the python lib package has been installed to if not 'corpus_dir' in dictArgs : if pkg_resources.resource_exists( __name__, 'geo_parse_lib.py' ) : # if run as an installed python lib strCorpusDir = os.path.dirname( pkg_resources.resource_filename( __name__, 'geo_parse_lib.py' ) ) else : # if run as a standalone file in a dir strCorpusDir = os.path.dirname( __file__ ) dictArgs['corpus_dir'] = strCorpusDir # always use lower case for geoparse work as microblog references to locations do not follow nice camel case # note: do this before soton_corenlppy.common_parse_lib.get_common_config() because we want things like stopwords and names to be lowercase for subsequent matching dictArgs['lower_tokens'] = True # convert 's to s to make entity matching easier dictArgs['apostrophe_handling'] = 'strip' # setup whitespace and punctuation for geoparse work (unless caller has provided something else in which case this default will be overwridden) if not 'whitespace' in dictArgs : dictArgs['whitespace'] = '[]"\u201a\u201b\u201c\u201d()' if not 'punctuation' in dictArgs : dictArgs['punctuation'] = """,;\/:+-#~&*=!?""", # setup common values dict_geospatial_config = soton_corenlppy.common_parse_lib.get_common_config( **dictArgs ) # check single word place names against wordnet to avoid common words mis-matching if not 'use_wordnet' in dictArgs : dict_geospatial_config['use_wordnet'] = True else : dict_geospatial_config['use_wordnet'] = dictArgs['use_wordnet'] # load local language specific geospatial stoplist # created by ITINNO for use with location name tokens listStoplist = dict_geospatial_config['stoplist'] for strCode in dictArgs['lang_codes'] : strStoplistFile = strCorpusDir + os.sep + 'corpus-geo-stoplist-' + strCode + '.txt' if os.path.isfile(strStoplistFile) : if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( 'loading stoplist from ' + strStoplistFile ) readHandle = codecs.open( strStoplistFile, 'r', 'utf-8' ) for line in readHandle : # remove newline at end (might not have one if last line) line = line.rstrip('\n') line = line.rstrip('\r') # remove UTF8 byte-order mark at start of file (added for UTF encoded files to indicate if its UTF-8, UTF-16 etc) line = line.lstrip( '\ufeff' ) # ignore comments in stoplist file if (len(line) > 1) and (not line.startswith('#')) : strTextClean = soton_corenlppy.common_parse_lib.clean_text( line, dict_geospatial_config ) if len(strTextClean) > 0 : if not strTextClean in listStoplist : listStoplist.append( strTextClean ) else : if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( 'missing stoplist file ' + strStoplistFile + ' (not loaded)' ) dict_geospatial_config['stoplist'] = listStoplist # load whitelist of good names to avoid wordnet rejection listWhitelist = [] strWhitelistFile = strCorpusDir + os.sep + 'corpus-geo-whitelist.txt' if os.path.isfile(strWhitelistFile) : if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( 'loading whitelist from ' + strWhitelistFile ) readHandle = codecs.open( strWhitelistFile, 'r', 'utf-8' ) for line in readHandle : # remove newline at end (might not have one if last line) line = line.rstrip('\n') line = line.rstrip('\r') # remove UTF8 byte-order mark at start of file (added for UTF encoded files to indicate if its UTF-8, UTF-16 etc) line = line.lstrip( '\ufeff' ) # ignore comments in stoplist file if (len(line) > 1) and (not line.startswith('#')) : strTextClean = soton_corenlppy.common_parse_lib.clean_text( line, dict_geospatial_config ) if len(strTextClean) > 0 : if not strTextClean in listWhitelist : listWhitelist.append( strTextClean ) dict_geospatial_config['whitelist'] = listWhitelist # load blacklist of bad names to avoid OSM names that match very common words listBlacklist = [] strBlacklistFile = strCorpusDir + os.sep + 'corpus-geo-blacklist.txt' if os.path.isfile(strBlacklistFile) : if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( 'loading blacklist from ' + strBlacklistFile ) readHandle = codecs.open( strBlacklistFile, 'r', 'utf-8' ) for line in readHandle : # remove newline at end (might not have one if last line) line = line.rstrip('\n') line = line.rstrip('\r') # remove UTF8 byte-order mark at start of file (added for UTF encoded files to indicate if its UTF-8, UTF-16 etc) line = line.lstrip( '\ufeff' ) # ignore comments in stoplist file if (len(line) > 1) and (not line.startswith('#')) : strTextClean = soton_corenlppy.common_parse_lib.clean_text( line, dict_geospatial_config ) if len(strTextClean) > 0 : if not strTextClean in listBlacklist : listBlacklist.append( strTextClean ) dict_geospatial_config['blacklist'] = listBlacklist # # load all language specific location_type corpus files # to get the building, street and admin type information (stopwords, prefix/suffix names) # dictBuildingTypes = { 'title' : [], 'type' : [] } dictStreetTypes = { 'title' : [], 'type' : [] } dictAdminTypes = { 'title' : [], 'type' : [] } for strCode in dictArgs['lang_codes'] : # language specific building types strCorpusFile = strCorpusDir + os.sep + 'corpus-buildingtype-' + strCode + '.txt' if os.path.isfile(strCorpusFile) : if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( 'loading building types from ' + strCorpusFile ) listTypeInfo = read_location_type_corpus( strCorpusFile, dict_geospatial_config ) dictBuildingTypes['title'].extend( listTypeInfo[0] ) dictBuildingTypes['type'].extend( listTypeInfo[1] ) # # remove from name set all location title prefixes # these will otherwise be assumed to be part of a name (e.g. west = title prefix, west = boys name, west london is OK but joe london is not) # for listTitles in listTypeInfo[0] : for tupleEntry in listTitles : if tupleEntry[0] != 'suffix' : if tupleEntry[-1] in dict_geospatial_config['first_names'] : dict_geospatial_config['first_names'].remove( tupleEntry[-1] ) # language specific street types strCorpusFile = strCorpusDir + os.sep + 'corpus-streettype-' + strCode + '.txt' if os.path.isfile(strCorpusFile) : if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( 'loading street types from ' + strCorpusFile ) listTypeInfo = read_location_type_corpus( strCorpusFile, dict_geospatial_config ) dictStreetTypes['title'].extend( listTypeInfo[0] ) dictStreetTypes['type'].extend( listTypeInfo[1] ) # # remove from name set all location title prefixes # these will otherwise be assumed to be part of a name (e.g. west = title prefix, west = boys name, west london is OK but joe london is not) # for listTitles in listTypeInfo[0] : for tupleEntry in listTitles : if tupleEntry[0] != 'suffix' : if tupleEntry[-1] in dict_geospatial_config['first_names'] : dict_geospatial_config['first_names'].remove( tupleEntry[-1] ) # language specific admin types strCorpusFile = strCorpusDir + os.sep + 'corpus-admintype-' + strCode + '.txt' if os.path.isfile(strCorpusFile) : if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( 'loading admin types from ' + strCorpusFile ) listTypeInfo = read_location_type_corpus( strCorpusFile, dict_geospatial_config ) dictAdminTypes['title'].extend( listTypeInfo[0] ) dictAdminTypes['type'].extend( listTypeInfo[1] ) # # remove from name set all location title prefixes # these will otherwise be assumed to be part of a name (e.g. west = title prefix, west = boys name, west london is OK but joe london is not) # for listTitles in listTypeInfo[0] : for tupleEntry in listTitles : if tupleEntry[0] != 'suffix' : if tupleEntry[-1] in dict_geospatial_config['first_names'] : dict_geospatial_config['first_names'].remove( tupleEntry[-1] ) dict_geospatial_config['building_types'] = dictBuildingTypes dict_geospatial_config['street_types'] = dictStreetTypes dict_geospatial_config['admin_types'] = dictAdminTypes # load any available gazeteer sources dictGaz = {} for strCode in dictArgs['lang_codes'] : strGazFile = strCorpusDir + os.sep + 'gazeteer-' + strCode + '.txt' if os.path.isfile(strGazFile) : if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( 'loading gazeteer from ' + strGazFile ) readHandle = codecs.open( strGazFile, 'r', 'utf-8' ) listGaz = [] for line in readHandle : # remove newline at end (might not have one if last line) line = line.rstrip('\n') line = line.rstrip('\r') # remove UTF8 byte-order mark at start of file (added for UTF encoded files to indicate if its UTF-8, UTF-16 etc) line = line.lstrip( '\ufeff' ) # create a list of [OSMID, OSMType, alt_name, ...] for later token expansion if (len(line) > 1) and (not line.startswith('#')) : listGazEntry = line.split(',') for nIndexGax in range( len(listGazEntry) ) : listGazEntry[nIndexGax] = listGazEntry[nIndexGax].strip() if len(listGazEntry) < 3 : if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( 'gaz file ' + strGazFile + ' has invalid OSMID entry with < 3 tokens (ignored - expected OSMID, OSMType, alt_name, ...)' ) listGazEntry[0] = int( listGazEntry[0] ) if (listGazEntry[1] != 'way') and (listGazEntry[1] != 'node') and (listGazEntry[1] != 'relation') : dict_geospatial_config['logger'].info( 'gaz file ' + strGazFile + ' has invalid OSMType entry - expected relation | way | node for ' + listGazEntry[1] ) else : listGaz.append( listGazEntry ) dictGaz[ strGazFile ] = listGaz dict_geospatial_config['gazeteers'] = dictGaz # all done return dict_geospatial_config
[docs]def is_good_place_name( phrase, dict_osm_tags, dict_geospatial_config ) : """ check if a phrase is a good placename (building, street, admin region etc.) for use in text matching. the OSM database contains some building names that are really house numbers (e.g. 50) and a few basic mistakes which need to be pruned to avoid poor quality matches. rejects short names, only numbers, only stoplist names. accepts short highway names e.g. 'M3' and multi-token admin regions. :param unicode phrase: OSM location phrase to check if it makes a good place name :param dict dict_osm_tags: OSM tags for this location :param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config() :return: True if this is a good location name, False if it should be rejected for token matching :rtype: bool """ # check args without defaults if not isinstance( phrase, str ) and not isinstance( phrase, str ): raise Exception( 'invalid phrase' ) if not isinstance( dict_osm_tags, dict ) : raise Exception( 'invalid dict_osm_tags' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # if in whitelist accept name is ok (check first so anything that matches is OK) if phrase in dict_geospatial_config['whitelist'] : return True # must always have more than 1 letter in phrase if len( phrase ) <= 1 : return False # if in blacklist name is not ok if phrase in dict_geospatial_config['blacklist'] : return False # get tokens listTokens = soton_corenlppy.common_parse_lib.tokenize_sentence( phrase, dict_geospatial_config ) # reject phrases that are simply too small (except road names which might have M3 type abbreviations or official admin areas like USA) if (not 'highway' in dict_osm_tags) and (not 'admin level' in dict_osm_tags) : if len( phrase ) <= 3 : return False # reject 1g phrases unless they are roads (e.g. M3) or admin regions (e.g. London) # and reject all stop word phrases unless its a 2gram+ region name (as region names can be full of stop words) if ('admin level' in dict_osm_tags) or ('place' in dict_osm_tags) or ('is in' in dict_osm_tags) : # admin => allow all 2g+ and 1g phrases IF they are not stop words if len(listTokens) == 1 : if soton_corenlppy.common_parse_lib.is_all_stoplist( listTokens, dict_geospatial_config ) : return False elif ('highway' in dict_osm_tags) : # roads => allow any IF they are not stop words if soton_corenlppy.common_parse_lib.is_all_stoplist( listTokens, dict_geospatial_config ) : return False else : # other => reject 1g phrases and check 2g+ is not only stop words # since OpenStreetMap has a lot of rubbish 1g phrases like 'station', '24', 'building' ... if len(listTokens) == 1 : return False if soton_corenlppy.common_parse_lib.is_all_stoplist( listTokens, dict_geospatial_config ) : return False # reject phrases with only numbers # e.g. '24' (such as flat number 24) bValid = False for strToken in listTokens : if strToken.isdigit() == False : bValid = True if bValid == False : return False # lookup single tokens in wordnet dictionary # use all known languages for stoplist as we do not know the language these will be matched against in advance if dict_geospatial_config['use_wordnet'] == True : if len(listTokens) == 1 : # for strLangISO639_2 in dict_geospatial_config['lang_codes_ISO639_2'] : for strLangISO639_2 in wordnet.langs() : # if location name is also a non-noun word its a bad name listSyn = wordnet.synsets( phrase, pos='asrv', lang=strLangISO639_2 ) if len(listSyn) > 0 : #dict_geospatial_config['logger'].info( 'WORDNET REJECT ' + phrase + ' : ' + strLangISO639_2 ) return False # all done return True
[docs]def expand_hashtag( phrase, dict_geospatial_config ) : """ return a hashtag for a phrase (expects clean phrase text) :param unicode phrase: OSM location phrase to check if it makes a good place name :param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config() :return: hashtag text :rtype: unicode """ # check args without defaults if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) if len(phrase) == 0 : return '' strHashtag = copy.deepcopy( phrase ) # remove spaces to make a hashtag word (e.g. newyork) strHashtag = strHashtag.replace( ' ','' ) if len(strHashtag) == 0 : return '' return '#' + strHashtag
[docs]def expand_osm_alternate_names( tuple_osmid, phrase, dict_osm_tags, dict_geospatial_config ) : """ return a list of location names expanded to include OSM ref, alt, language variants, street and building type variants etc. for example 'London St' will generate ['London Street', 'London St']. :param tuple tuple_osmid: tuple of OSM IDs that represent this location. locations such as roads can have multiple OSM IDs which represent different ways along the length of the road. :param unicode phrase: cleaned name (not stemmed) of OSM location which should be expanded :param dict dict_osm_tags: OSM tags for this location :param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config() :return: list of name variants for this location phrase (including the original phrase itself) :rtype: list """ # check args without defaults if not isinstance( tuple_osmid, tuple ) : raise Exception( 'invalid tuple_osmid' ) if not isinstance( dict_osm_tags, dict ) : raise Exception( 'invalid dict_osm_tags' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # always keep the initial phrase if is_good_place_name( phrase, dict_osm_tags, dict_geospatial_config ) : listResult = [phrase] else : listResult = [] # compile a list of tags to check # note: 'alt_name:en' actually appears in database as 'alt name:en' listTagsToCheck = [ 'name', 'ref', 'loc ref', 'nat ref', 'old ref', 'reg ref', 'ISO3166-1', 'ISO3166-1:alpha2', 'ISO3166-1:alpha3' ] listTagsToCheck.extend( [ 'alt name', 'alt name:1', 'alt name:2', 'int name', 'loc name', 'nat name', 'old name', 'reg name', 'short name', 'name:abbreviation', 'name:simple', 'sorting name' ] ) for strLangCode in dict_geospatial_config['lang_codes'] : listTagsToCheck.extend( [ 'name:' + strLangCode, 'alt name:' + strLangCode, 'old name:' + strLangCode ] ) # check for OSM reference tags and add them (avoid duplicates) for strTag in listTagsToCheck : if strTag in dict_osm_tags : # get name strPhraseTag = dict_osm_tags[ strTag ] # remove (...) in cases like 'Montreal (06)' that do exist alas if '(' in strPhraseTag : strPhraseTag = strPhraseTag[ : strPhraseTag.index('(') ] # clean name strPhraseTag = soton_corenlppy.common_parse_lib.clean_text( strPhraseTag, dict_geospatial_config ) # check its a good name if (len(strPhraseTag) > 0) and (not strPhraseTag in listResult) : if is_good_place_name( strPhraseTag, dict_osm_tags, dict_geospatial_config ) == True : listResult.append( strPhraseTag ) # expand building names # also use OSM types if possible to add a type to end of building so it triggers expansion bBuilding = False if 'railway' in dict_osm_tags : if dict_osm_tags['railway'] == 'station' : bBuilding = True if 'en' in dict_geospatial_config['lang_codes'] : if not ' station' in phrase : listResult.append( phrase + ' station' ) if 'station' in dict_osm_tags : if dict_osm_tags['station'] == 'subway' : if not 'subway' in phrase : listResult.append( phrase + ' subway' ) if not 'metro' in phrase : listResult.append( phrase + ' metro' ) if not 'underground' in phrase : listResult.append( phrase + ' underground' ) elif 'amenity' in dict_osm_tags : # for buildings use its (english) type as a suffix for token expansion # e.g. Le Batacan = Le Batacan Threatre bBuilding = True if 'en' in dict_geospatial_config['lang_codes'] : strType = dict_osm_tags['amenity'].replace('_',' ') if (strType != 'yes') and (strType != 'user defined') : if not ' ' + strType in phrase : listResult.append( phrase + ' ' + strType ) elif 'building' in dict_osm_tags : bBuilding = True if 'en' in dict_geospatial_config['lang_codes'] : strType = dict_osm_tags['building'].replace('_',' ') if (strType != 'yes') and (strType != 'user defined') : if not ' ' + strType in phrase : listResult.append( phrase + ' ' + strType ) if bBuilding == True : if dict_geospatial_config['building_types'] != None : listResult = expand_osm_location_types( listResult, dict_geospatial_config['building_types'], dict_osm_tags, dict_geospatial_config ) # expand street names (dont allow buildings) if bBuilding == False : if 'highway' in dict_osm_tags : if dict_geospatial_config['street_types'] != None : listResult = expand_osm_location_types( listResult, dict_geospatial_config['street_types'], dict_osm_tags, dict_geospatial_config ) # expand admin region names (dont allow buildings) if bBuilding == False : if ('admin level' in dict_osm_tags) or ('place' in dict_osm_tags) : if dict_geospatial_config['admin_types'] != None : listResult = expand_osm_location_types( listResult, dict_geospatial_config['admin_types'], dict_osm_tags, dict_geospatial_config ) # expand gazeteer entries for this OSMID (if any) if dict_geospatial_config['gazeteers'] != None : for strGaz in dict_geospatial_config['gazeteers'] : for nIndexGaz in range( len(dict_geospatial_config['gazeteers'][strGaz]) ) : # in OSM relations have a negative ID, nodes and way positive nID = dict_geospatial_config['gazeteers'][strGaz][nIndexGaz][0] strType = dict_geospatial_config['gazeteers'][strGaz][nIndexGaz][1] if strType == 'relation' : nID = -1 * nID if nID in tuple_osmid : for strPhrase in dict_geospatial_config['gazeteers'][strGaz][nIndexGaz][2:] : strPhraseClean = soton_corenlppy.common_parse_lib.clean_text( strPhrase, dict_geospatial_config ) if not strPhraseClean in listResult : listResult.append( strPhraseClean ) # add hashtag versions of all expanded phrases for nIndex in range(len(listResult)) : strHashtag = expand_hashtag( listResult[nIndex], dict_geospatial_config ) if len(strHashtag) > 0 : listResult.append( strHashtag ) # return what we have return listResult
[docs]def calc_OSM_type( dict_osm_tags, dict_geospatial_config ) : """ calc the OSM tags to work out the type of OSM location. this is especialy useful for high level filtering and visualization as OSM tags are quite detailed :param dict dict_osm_tags: OSM tags for this location :param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config() :return: transport | building | admin | other :rtype: str """ # check args without defaults if not isinstance( dict_osm_tags, dict ) : raise Exception( 'invalid dict_osm_tags' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # note: all _ are replaced with ' ' before dictOSMTags is computed if 'admin level' in dict_osm_tags : return 'admin' if 'place' in dict_osm_tags : return 'admin' if 'railway' in dict_osm_tags : return 'transport' if 'highway' in dict_osm_tags : return 'transport' if 'aerialway' in dict_osm_tags : return 'transport' if 'public transport' in dict_osm_tags : return 'transport' if 'route' in dict_osm_tags : return 'transport' if 'amenity' in dict_osm_tags : # check for transport amenities strValue = dict_osm_tags['amenity'] if strValue == 'bus station' : return 'transport' if strValue == 'ev charging' : return 'transport' if strValue == 'charging station' : return 'transport' if strValue == 'ferry terminal' : return 'transport' if strValue == 'fuel' : return 'transport' if strValue == 'parking' : return 'transport' if strValue == 'parking entrance' : return 'transport' if strValue == 'parking space' : return 'transport' if strValue == 'taxi' : return 'transport' # default amenity type is a building return 'building' if 'building' in dict_osm_tags : # check for transport buildings strValue = dict_osm_tags['building'] if strValue == 'train station' : return 'transport' if strValue == 'transportation' : return 'transport' if strValue == 'hanger' : return 'transport' # default building is a building! return 'building' if 'landuse' in dict_osm_tags : # check for transport uses strValue = dict_osm_tags['landuse'] if strValue == 'railway' : return 'transport' if 'man made' in dict_osm_tags : # check for man made buildings strValue = dict_osm_tags['man made'] if strValue == 'bunker silo' : return 'building' if strValue == 'lighthouse' : return 'building' if strValue == 'silo' : return 'building' if strValue == 'wastewater plant' : return 'building' if strValue == 'watermill' : return 'building' if strValue == 'water tower' : return 'building' if strValue == 'water works' : return 'building' if strValue == 'windmill' : return 'building' if strValue == 'works' : return 'building' if 'military' in dict_osm_tags : # check for miltary buildings strValue = dict_osm_tags['military'] if strValue == 'airfield' : return 'transport' if strValue == 'bunker' : return 'building' if strValue == 'barracks' : return 'building' if strValue == 'naval base' : return 'building' if 'shop' in dict_osm_tags : return 'building' if 'tourism' in dict_osm_tags : # check for tourist buildings strValue = dict_osm_tags['tourism'] if strValue == 'alpine hut' : return 'building' if strValue == 'chalet' : return 'building' if strValue == 'gallery' : return 'building' if strValue == 'guest house' : return 'building' if strValue == 'hostel' : return 'building' if strValue == 'hotel' : return 'building' if strValue == 'motel' : return 'building' if strValue == 'museum' : return 'building' if strValue == 'theme park' : return 'building' if strValue == 'wilderness hut' : return 'building' if strValue == 'zoo' : return 'building' # anything else return other return 'other'
[docs]def calc_OSM_linkedgeodata_uri( tuple_osmid, geom ) : """ return a linkedgeodata URI for this OSMID (first ID in tuple only) :param tuple tuple_osmid: tuple of OSM IDs that represent this location. locations such as roads can have multiple OSM IDs which represent different ways along the length of the road. :param str geom: serialized OpenGIS geometry object e.g. 'POINT( lon lat )' :return: URI to linkedgeodata for first OSM ID in tuple :rtype: str """ if (not isinstance( tuple_osmid, tuple )) or (len(tuple_osmid) == 0) : raise Exception( 'invalid tuple_osmid' ) if (not isinstance( geom, str )) and (not isinstance( geom, str )) : raise Exception( 'invalid geom' ) # we will only provide a URI for the first OSMID if the location is a complex one nOSMIDFirst = tuple_osmid[0] # relations have a negative number if nOSMIDFirst < 0 : return "http://linkedgeodata.org/page/triplify/relation" + str( -1 * nOSMIDFirst ) # nodes are points if geom.lower().startswith( 'point(' ) : return "http://linkedgeodata.org/page/triplify/node" + str( nOSMIDFirst ) # otherwise assume its a way return "http://linkedgeodata.org/page/triplify/way" + str( nOSMIDFirst )
[docs]def calc_OSM_uri( tuple_osmid, geom ) : """ return a openstreetmap URI for this OSMID (first ID in tuple only) :param tuple tuple_osmid: tuple of OSM IDs that represent this location. locations such as roads can have multiple OSM IDs which represent different ways along the length of the road. :param str geom: serialized OpenGIS geometry object e.g. 'POINT( lon lat )' :return: URI to linkedgeodata for first OSM ID in tuple :rtype: str """ if (not isinstance( tuple_osmid, tuple )) or (len(tuple_osmid) == 0) : raise Exception( 'invalid tuple_osmid' ) if (not isinstance( geom, str )) and (not isinstance( geom, str )) : raise Exception( 'invalid geom' ) # we will only provide a URI for the first OSMID if the location is a complex one nOSMIDFirst = tuple_osmid[0] # relations have a negative number if nOSMIDFirst < 0 : return "http://www.openstreetmap.org/relation/" + str( -1 * nOSMIDFirst ) # nodes are points if geom.lower().startswith( 'point(' ) : return "http://www.openstreetmap.org/node/" + str( nOSMIDFirst ) # otherwise assume its a way return "http://www.openstreetmap.org/way/" + str( nOSMIDFirst )
[docs]def expand_osm_location_types( list_location_names, dict_location_types, dict_osm_tags, dict_geospatial_config ) : """ given an set of location names return an expanded list with all known location type name variants. the original location name will always appear in the variant list. e.g. ['london st'] -> [[ 'london st', 'london street' ]] :param list list_location_names: list of clean location phrase variants for this location (e.g. full name, short name and abbreviation) :param dict dict_location_types: dict of types prefixes and type variants in form { 'title' : listTypePattern, 'type' : listTypePattern }. listTypePattern is generated using read_location_type_corpus() :param dict dict_osm_tags: OSM tags for this location :param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config() :return: expanded list of location phrase variants :rtype: list """ # dictLocationTypes = { 'title' : listTitleSet, 'type' : listTypeSet } # listTitleSet = listTypeSet = [ [ ('prefix' | 'suffix' | 'both', token, token ... ) ... N_phrase_variants ], ... N_types ] # phrase = (token1, token2, ...) # e.g. [ [ ('suffix','clinic'), ('suffix','day','centre') ], [ ('both','uni'),('both','university') ] ] # check args without defaults if not isinstance( list_location_names, list ) : raise Exception( 'invalid list_location_names' ) if not isinstance( dict_location_types, dict ) : raise Exception( 'invalid dict_location_types' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # always keep the initial list of variant names listResult = copy.deepcopy( list_location_names ) # loop on location name variants for nIndexPhrase in range(len(list_location_names)) : # get phrase strPhrase = list_location_names[nIndexPhrase] # note: phrase is expected to be clean and spaces delimiting words # note: convert to tuple as we will be matching it to type phrases (which are tuples) and list != tuple tupleTokens = tuple( soton_corenlppy.common_parse_lib.tokenize_sentence( strPhrase, dict_geospatial_config ) ) # remove first found title prefix or suffix from location name # note: types are sorted by gram order so bigrams will match before unigrams # note: this has a weakness for 'the big building org' will match title 'the' ==> not 'org' as well tupleTokensStripped = tupleTokens tupleTitlePrefix = () tupleTitleSuffix = () # # check for known place titles (take first match) # for tupleStopTokens in dict_location_types['title'] : if len(tupleTokens) >= len(tupleStopTokens)-1 : # check stop words in prefix if tupleStopTokens[0] == 'prefix' or tupleStopTokens[0] == 'both' : if tupleStopTokens[1:] == tupleTokens[:len(tupleStopTokens)-1] : tupleTitlePrefix = tupleStopTokens[1:] tupleTokensStripped = tupleTokens[len(tupleStopTokens)-1:] break # check stop words in suffix if tupleStopTokens[0] == 'suffix' or tupleStopTokens[0] == 'both' : if tupleStopTokens[1:] == tupleTokens[-1*(len(tupleStopTokens)-1):] : tupleTitleSuffix = tupleStopTokens[1:] tupleTokensStripped = tupleTokens[:-1*(len(tupleStopTokens)-1)] break # do not allow location names where the title is the whole name # such location names are not informative enough to be useful for accurate matching # e.g. location = 'north london' ==> expand('london', 'north london') # e.g. location = 'north' ==> reject if len(tupleTokensStripped) == 0 : # delete this location name entirely as it is a # match to a location type name and not informative enough # note: listResult has all the listVariants tokens so we just void the bad one listResult[nIndexPhrase] = '' continue # add the phrase stripped of any title words to the variant list (even if it does not contain a type) strStrippedPhrase = ' '.join( tupleTokensStripped ) if not strStrippedPhrase in listResult : if is_good_place_name( strStrippedPhrase, dict_osm_tags, dict_geospatial_config ) == True : listResult.append( strStrippedPhrase ) # loop on each location type and take first match found # note: types are sorted by gram order so bigrams will match before unigrams tupleTokensNoType = () listMatchedLocationType = [] bMatched = False for listLocationType in dict_location_types[ 'type' ] : # listLocationType = [ [ ('suffix','t1.1','t1.2'), ('both','t2.1','t2.2', ... ) ] ] # ignore types with the * no_match prefix # e.g. '*high school' --> ('suffix', '*high', 'school') for tuplePhrase in listLocationType : if len(tuplePhrase)-1 <= len(tupleTokensStripped) : if not tuplePhrase[1].startswith( '*' ) : if tuplePhrase[0] == 'prefix' or tuplePhrase[0] == 'both' : if tuplePhrase[1:] == tupleTokensStripped[:len(tuplePhrase)-1] : # strip the matched location type from the location name tupleTokensNoType = tupleTokensStripped[len(tuplePhrase)-1:] listMatchedLocationType = listLocationType bMatched = True break if tuplePhrase[0] == 'suffix' or tuplePhrase[0] == 'both' : if tuplePhrase[1:] == tupleTokensStripped[-1*(len(tuplePhrase)-1):] : # strip the matched location type from the location name tupleTokensNoType = tupleTokensStripped[:-1*(len(tuplePhrase)-1)] listMatchedLocationType = listLocationType bMatched = True break if bMatched == True : break # if no match the original phrase is used as it has no known type to expand if bMatched == False : continue # do not allow location names where the type is the whole name # such location names are not informative enough to be useful for accurate matching # e.g. location = 'nuffield hospital' ==> expand('hospital) # e.g. location = 'hospital' ==> reject if len(tupleTokensNoType) == 0 : # delete this location name entirely as it is a # match to a location type name and not informative enough # note: listResult has all the listVariants tokens so we just void the bad one listResult[nIndexPhrase] = '' # generate all variants of the location name using the location type variant list elif len(tupleTokensNoType) > 0 : for tuplePhrase in listMatchedLocationType : if tuplePhrase[0] == 'prefix' or tuplePhrase[0] == 'both' : # note: use a list as tuples are fixed in size listNewPhrase = list( tuplePhrase[1:] ) listNewPhrase.extend( tupleTokensNoType ) # add new phrase without any titles (if not already present) strNewPhrase = ' '.join( listNewPhrase ).replace('*','') if not strNewPhrase in listResult : if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True : listResult.append( strNewPhrase ) # add new phrase with title prefix (if not already present) if len(tupleTitlePrefix) > 0 : listNewPhraseTitle = copy.deepcopy( tupleTitlePrefix ) listNewPhraseTitle.extend( listNewPhrase ) strNewPhrase = ' '.join( listNewPhraseTitle ).replace('*','') if not strNewPhrase in listResult : if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True : listResult.append( strNewPhrase ) # add new phrase with title suffix (if not already present) if len(tupleTitleSuffix) > 0 : listNewPhraseTitle = copy.deepcopy( listNewPhrase ) listNewPhraseTitle.extend( tupleTitleSuffix ) strNewPhrase = ' '.join( listNewPhraseTitle ).replace('*','') if not strNewPhrase in listResult : if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True : listResult.append( strNewPhrase ) if tuplePhrase[0] == 'suffix' or tuplePhrase[0] == 'both' : # note: use a list as tuples are fixed in size listNewPhrase = list( tupleTokensNoType ) listNewPhrase.extend( tuplePhrase[1:] ) # add new phrase without any titles (if not already present) strNewPhrase = ' '.join( listNewPhrase ).replace('*','') if not strNewPhrase in listResult : if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True : listResult.append( strNewPhrase ) # add new phrase with title prefix (if not already present) if len(tupleTitlePrefix) > 0 : listNewPhraseTitle = copy.deepcopy( tupleTitlePrefix ) listNewPhraseTitle.extend( listNewPhrase ) strNewPhrase = ' '.join( listNewPhraseTitle ).replace('*','') if not strNewPhrase in listResult : if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True : listResult.append( strNewPhrase ) # add new phrase with title suffix (if not already present) if len(tupleTitleSuffix) > 0 : listNewPhraseTitle = copy.deepcopy( listNewPhrase ) listNewPhraseTitle.extend( tupleTitleSuffix ) strNewPhrase = ' '.join( listNewPhraseTitle ).replace('*','') if not strNewPhrase in listResult : if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True : listResult.append( strNewPhrase ) # remove any empty names while listResult.count('') : listResult.remove('') # return what we have after expansion return listResult
[docs]def read_location_type_corpus( filename, dict_geospatial_config ) : """ read a location type corpus containing information for location prefix variants (e.g. north london) and location type name variants (e.g. london street, london st) | note : variants are encoded as a list [ [ ('prefix' | 'suffix' | 'both', token, token ... ) ... ], ... ] | e.g. [ [ ('suffix','clinic'), ('suffix','day','centre') ], [ ('both','uni'),('both','university') ] ] | corpus file syntax : | title, ... for location title words not part of the type (e.g. north) | type, ... for location type types (e.g. hospital) | \# for comments | +<phrase> = prefix to location name | <phrase>+ = suffix to location name | +<phrase>+ = can be both prefix and suffix | tokens starting with a '*' indicate that the phrase is not to be used for initial match, but will used for expansion variants | e.g. primary, *school --> matches only '<name> primary' BUT will expand to '<name> primary', '<name> school' since there are other types of school that could match eroneously :param str filename: filename of location type corpus :param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config() :return: list of prefix variants and type variants :rtype: list """ # check args without defaults if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( 'loading location type corpus ' + filename ) listTitleSet = [] listTypeSet = [] # make a custom whitespace allowing '*' which will be the no match flag strUserWhitespace = copy.deepcopy( dict_geospatial_config['whitespace'] ) strUserWhitespace = strUserWhitespace.replace( '*', '' ) # open file readHandle = codecs.open( filename, 'r', 'utf-8' ) # read each line in document and process it nLineCount = 0 for line in readHandle : nLineCount = nLineCount + 1 # remove newline at end (might not have one if last line) line = line.rstrip( '\n' ) line = line.rstrip( '\r' ) # remove UTF8 byte-order mark at start of file (added for UTF encoded files to indicate if its UTF-8, UTF-16 etc) line = line.lstrip( '\ufeff' ) if (len(line) > 1) and (not line.startswith('#')) : # tokenize using comma delimiter # and make lowercase listEntries = line.lower().split(',') if len( listEntries ) > 0 : bType = False bTitle = False if listEntries[0].strip() == 'title' : bTitle = True elif listEntries[0].strip() == 'type' : bType = True else : raise Exception( 'row found without a title | type prefix : ' + str(filename) + ' : line ' + str(nLineCount) ) # # create entry by parsing special tokens (comma delimited phrase set) # listEntryVariants = [] for strPhrase in listEntries[1:] : # parse special characters strPhraseStripped = copy.deepcopy( strPhrase.strip() ) bNoMatch = False bPrefix = False bSuffix = False if strPhraseStripped.startswith('*') : strPhraseStripped = strPhraseStripped[1:] bNoMatch = True if strPhraseStripped.startswith('+') : strPhraseStripped = strPhraseStripped[1:] bPrefix = True if strPhraseStripped.endswith('+') : strPhraseStripped = strPhraseStripped[:-1] bSuffix = True # clean strTextClean = soton_corenlppy.common_parse_lib.clean_text( strPhraseStripped, dict_geospatial_config ) # tokenize phrase listTokens = soton_corenlppy.common_parse_lib.tokenize_sentence( strTextClean, dict_geospatial_config ) if len(listTokens) > 0 : # keep * in front of 1st token of phrases we dont want to match if bNoMatch == True : listTokens[0] = '*' + listTokens[0] # apply prefix|suffix|both labelling if bPrefix and bSuffix : listTokens.insert( 0,'both' ) elif bSuffix : listTokens.insert( 0,'suffix' ) elif bPrefix : listTokens.insert( 0,'prefix' ) else : raise Exception( 'phrase found without a prefix/suffix indicator : ' + str(filename) + ' : line ' + str(nLineCount) ) # add tokens to list tuplePhrase = tuple( listTokens ) if not tuplePhrase in listEntryVariants : listEntryVariants.append( tuplePhrase ) if bTitle == True : listTitleSet.append( listEntryVariants ) if bType == True : listTypeSet.append( listEntryVariants ) # all done readHandle.close() if dict_geospatial_config['logger'] != None : dict_geospatial_config['logger'].info( '- ' + str(len(listTitleSet)) + ' unique titles' ) dict_geospatial_config['logger'].info( '- ' + str(len(listTypeSet)) + ' unique types' ) # sort all token sets so the tokens appear in gram order # ensuring highest gram gets matched first for nIndex in range(len(listTitleSet)) : listTitleSet[nIndex] = sorted( listTitleSet[nIndex], key=len, reverse=True ) for nIndex in range(len(listTypeSet)) : listTypeSet[nIndex] = sorted( listTypeSet[nIndex], key=len, reverse=True ) # debug (pretty print of place names) """ if dict_geospatial_config['logger'] != None : for listEntry in listTitleSet : dict_geospatial_config['logger'].info( 'TITLE = ' + repr(listEntry) ) for listEntry in listTypeSet : dict_geospatial_config['logger'].info( 'TYPE = ' + repr(listEntry) ) """ # all done return [listTitleSet,listTypeSet]
[docs]def calc_geom_index( list_data, index_geom = 4, index_id = 2, index_osm_tag = 5 ) : """ compile an index of shapely geoms from a list of data where one column is a geom. there can be several geom for each osmid as island groups can have a geom per island, but still have a single osmid and name (e.g. Shetland, UK). the key for this index will either be the original data list row number OR the value from an ID column if provided. a OSM tag column can optionally be provided to append OSM tag data to the end of the geom index entry, which can be useful for determining the location type each geom refers to (e.g. admin region, road) :param list list_data: list of data rows where one of the column contains a serialized OpenGIS geom :param int index_geom: column index in list of data for geom :param int index_id: column index in list of data for id (can be None) :param int index_osm_tag: column index in list of data for OSM tag dict (can be None) :return: dict of { id : [ ( shapely_geom_prepared, shapely_envelope, shapely_geom, shapely_representative_point, dict_OSM_tag, geom_serialized ), ...] } :rtype: dict """ # check args without defaults if not isinstance( list_data, list ) : raise Exception( 'invalid list_data' ) if not isinstance( index_geom, int ) : raise Exception( 'invalid index_geom' ) # anything to do ? if len(list_data) == 0 : return {} # enable speedups if supported # note: this is important as it enabled c compiled functions to replace py functions # which can give an order of magnitude speedup if shapely.speedups.available == True : shapely.speedups.enable() # check index OK if len(list_data[0]) <= index_geom : raise Exception( 'geom index >= len(source data)' ) if (index_id != None) and (len(list_data[0]) <= index_id) : raise Exception( 'ID index >= len(source data)' ) if (index_osm_tag != None) and (len(list_data[0]) <= index_osm_tag) : raise Exception( 'OSM tag index >= len(source data)' ) dictGeomIndex = {} for nDocIndex in range(len(list_data)) : # get source geom strGeom = list_data[nDocIndex][index_geom] if (not isinstance( strGeom, str )) and (not isinstance( strGeom, str )) : raise Exception( 'object type of geom not str,unicode : ' + str(type(strGeom)) ) # get ID tuple if index_id != None : if isinstance( list_data[nDocIndex][index_id], list ) : tupleID = tuple( list_data[nDocIndex][index_id] ) elif isinstance( list_data[nDocIndex][index_id], tuple ) : tupleID = list_data[nDocIndex][index_id] elif isinstance( list_data[nDocIndex][index_id], str ) : tupleID = tuple( [list_data[nDocIndex][index_id]] ) elif isinstance( list_data[nDocIndex][index_id], str ) : tupleID = tuple( [list_data[nDocIndex][index_id]] ) else : raise Exception( 'object type of ID not list,tuple,str,unicode' ) else : tupleID = (nDocIndex,) # get OSM tag dict if index_osm_tag != None : dictOSMTag = list_data[nDocIndex][index_osm_tag] if not isinstance( dictOSMTag, dict ) : raise Exception( 'object type of OSM tag not dict' ) else : dictOSMTag = {} # add shape to dict if not already in index # note: use a prepared object for more efficient batch processing later bAdd = True if not tupleID in dictGeomIndex : dictGeomIndex[ tupleID ] = [] for entry in dictGeomIndex[ tupleID ] : if entry[5] == strGeom : bAdd = False if bAdd == True : shapeGeom = shapely.wkt.loads( strGeom ) shapeGeomSimple = shapeGeom.envelope shapeGeomPrep = shapely.prepared.prep(shapeGeom) shapeGeomSimplePrep = shapely.prepared.prep(shapeGeomSimple) pointRepresentative = shapeGeom.representative_point() dictGeomIndex[ tupleID ].append( (shapeGeomPrep, shapeGeomSimplePrep, shapeGeom, pointRepresentative, dictOSMTag, strGeom) ) # return geom index return dictGeomIndex
[docs]def geoparse_token_set( token_set, dict_inverted_index, dict_geospatial_config ) : """ geoparse token sets using a set of cached locations. no location disambiguation is performed here so all possible location matches will be returned for each token :param list token_set: list of tokens to geoparse :param dict dict_inverted_index: inverted index of cached locations from soton_corenlppy.common_parse_lib.calc_inverted_index() :param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config() :return: list of location indexes for each matched phrase [ [ (token_start, token_end, set(doc_index1, doc_index2 ...), matched_tokens), ... ], ... ] :rtype: list """ # check args without defaults if not isinstance( token_set, list ) : raise Exception( 'invalid token_set' ) if not isinstance( dict_inverted_index, dict ) : raise Exception( 'invalid dict_inverted_index' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) listResultSet = [] listSentChars = dict_geospatial_config['sent_token_seps'] # get list of names we will not allow to be prefix to a location phrase setNames = dict_geospatial_config['first_names'] # loop on all token sets to geoparse for listTokens1g in token_set : listResult = [] # create all combinations of ngram tokens from this 1gram token list # note: listTokenAllg = [ [('one',),('two',),('three',),('four',)], [('one','two'), ('two','three'), ('three','four')], [('one','two','three'),('two','three','four')] ] #listTokenAllg = soton_corenlppy.common_parse_lib.create_ngram_tokens( listTokens1g, dict_geospatial_config['max_gram'], dict_geospatial_config['sent_token_seps'] ) # DEBUG #dict_geospatial_config['logger'].info( 'TOKEN_SET allg = ' + repr(listTokenAllg) ) #dict_geospatial_config['logger'].info( 'TOKEN_SET 1g = ' + repr(listTokens1g) ) # try to match all phrases in the gram range requested # note: avoid using soton_corenlppy.common_parse_lib.create_ngram_tokens() as we will lose the token position in cases of newlines (as ngram phrases are designed to not span sent boundaries) nMaxGram = dict_geospatial_config['max_gram'] for nTokenIndexStart in range(len(listTokens1g)) : for nTokenIndexEnd in range( nTokenIndexStart+1, nTokenIndexStart+1 + nMaxGram ) : if nTokenIndexEnd <= len(listTokens1g) : # get phrase to match with tuplePhrase = tuple( listTokens1g[ nTokenIndexStart:nTokenIndexEnd ] ) # reject any phrase that spans a sentence bPhraseSpansSent = False for strToken in tuplePhrase : if strToken in listSentChars : bPhraseSpansSent = True if bPhraseSpansSent == True : break # check inverted index for a match (if none ignore phrase) if tuplePhrase in dict_inverted_index : setMatchIndex = dict_inverted_index[ tuplePhrase ] # check to see if the previous token is a first name AND location name length is 1 token # if so REJECT match as its part of a name # e.g. this is victoria derbyshire = NO # e.g. will new york city be great = YES # e.g. is Chelsea Smith be going out tonight = YES (another error that is unavoidable as we have no context to decide if Chelsea is a person or location without POS tagging) # e.g. will London be great = NO (error but unavoidable as there might be a Will London out there!) bNameCheck = False if (nTokenIndexStart - 1 >= 0) and (len(tuplePhrase) == 1) : if listTokens1g[ nTokenIndexStart - 1 ] in setNames : bNameCheck = True #dict_geospatial_config['logger'].info( 'NAME REJECT = ' + repr(listTokens1g[ nTokenIndexStart - 1 ]) + ' : ' + repr(tuplePhrase) ) if bNameCheck == False : listResult.append( (nTokenIndexStart, nTokenIndexEnd-1, setMatchIndex, tuplePhrase) ) # debug ''' if ('bassett' in listTokens1g) : dict_geospatial_config['logger'].info('GEO_MICRO1 = ' + repr(tuplePhrase) ) dict_geospatial_config['logger'].info('GEO_MICRO2 = ' + repr(bNameCheck) ) dict_geospatial_config['logger'].info('GEO_MICRO3 = ' + repr( (nTokenIndexStart,nTokenIndexEnd-1) ) ) ''' # debug ''' if ('bassett' in listTokens1g) : dict_geospatial_config['logger'].info('GEO_MICRO4 = ' + repr(listTokens1g) ) ''' listResultSet.append( listResult ) # all done return listResultSet
[docs]def reverse_geocode_geom( list_geom, dict_geom_index, dict_geospatial_config ) : """ reverse geocode a list of OpenGIS geom objects and return all indexed geoms that intersect with each geom in this list | note: if OSM tag info is specified in the dictGeomIndex then the geom with the highest admin level will be returned (i.e. most specific location returned, so road before suburb before city before country) :param list list_geom: list of serialized OpenGIS geoms to geoparse :param dict dict_geom_index: geom index from geo_parse_lib.calc_geom_index() :param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config() :return: a set for each geom checked, with the set containing the ids of any intersecting geoms e.g. [ set( tuple_osmid1, tuple_osmid2 ... ), ... ] :rtype: list """ # check args without defaults if not isinstance( list_geom, list ) : raise Exception( 'invalid list_geom' ) if not isinstance( dict_geom_index, dict ) : raise Exception( 'invalid dict_geom_index' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # init listResult = [] for strGeom in list_geom : # init listEntry = [] nTopAdminLevel = 0 # if no geom no matches if (strGeom != None) and (len(strGeom) > 0) : # make the shape and do the slow shapely intersects() function calls shapeGeom = shapely.wkt.loads( strGeom ) shapeGeomPrep = shapely.prepared.prep( shapeGeom ) # loop on all token sets to geoparse # use prepared geometry shapeLoc to call intersect function as it is more efficient # do a quick envelope test first as its orders of magnitude quicker than checking a complex polygon like Russia's border for tupleID in dict_geom_index : listGeoms = dict_geom_index[ tupleID ] for tupleGeomData in listGeoms : shapeLocPrep = tupleGeomData[0] shapeLocSimplePrep = tupleGeomData[1] shapeLoc = tupleGeomData[2] dictOSMTag = tupleGeomData[4] # optimization test results (ranked in order of speed - all very similar) # (1) envelope_prep.intersects() then geom_prep.intersects() # (2) geom_prep.intersects() # (3) geom.within() # (4) envelope_prep.contains() then geom_prep.contains() [intersects for polygons] # (5) geom_prep.contains() [intersects for polygons] # use intersects for polygon to polygon checks # check simple polygon first as a simple spatial index check before using full polygon bMatch = False if shapeLocSimplePrep.intersects( shapeGeom ) == True : if shapeLocPrep.intersects( shapeGeom ) == True : bMatch = True if bMatch == True : if 'admin level' in dictOSMTag : nAdminLevel = int( dictOSMTag['admin level'] ) else : nAdminLevel = 100 # add to final result list unless we have seen this osmid before (with a different geom) if not (tupleID,nAdminLevel) in listEntry : listEntry.append( (tupleID,nAdminLevel) ) if nAdminLevel > nTopAdminLevel : nTopAdminLevel = nAdminLevel # only keep top OSM admin level results # since we want the most specific location to be matched NOT the super region # e.g. wall street > NY > USA setResult = set() for nIndex in range(len(listEntry)) : if listEntry[nIndex][1] == nTopAdminLevel : setResult.add( listEntry[nIndex][0] ) listResult.append( setResult ) # all done return listResult
[docs]def create_matched_location_list( list_match, cached_locations, osmid_lookup ) : """ create a list of locations based on a set of matches and the original cached location table :param list list_match: list of location matches from geo_parse_lib.geoparse_token_set() :param list cached_locations: list of cached locations from geo_preprocess_lib.cache_preprocessed_locations() :param dict osmid_lookup: lookup table mapping an osmid to a set of rows in the cached locations from geo_parse_lib.calc_osmid_lookup() :return: list of matched locations with all of the geom information e.g. [[<loc_id>, <token_start>, <token_end>, loc_tokens, geom, (<osm_id>, ...), {<osm_tag>:<value>}*N_tags, (<osm_id_parent>, ...)] ...] :rtype: list """ # check args without defaults if not isinstance( list_match, list ) : raise Exception( 'invalid list_match' ) if not isinstance( cached_locations, list ) : raise Exception( 'invalid cached_locations' ) if not isinstance( osmid_lookup, dict ) : raise Exception( 'invalid osmid_lookup' ) # create a list of annotated location matches with super region information useful for disambiguation listLocMatches = [] for tupleMatch in list_match : for tupleOSMIDs in tupleMatch[2] : setIndexLoc = osmid_lookup[ tupleOSMIDs ] for nIndexLoc in setIndexLoc : listEntry = [ cached_locations[nIndexLoc][0], tupleMatch[0], tupleMatch[1], tupleMatch[3], cached_locations[nIndexLoc][4], cached_locations[nIndexLoc][2], cached_locations[nIndexLoc][5], cached_locations[nIndexLoc][3] ] listLocMatches.append( listEntry ) return listLocMatches
[docs]def calc_location_confidence( list_loc, dict_geospatial_config, index_token_start = 1, index_token_end = 2, index_osm_id = 5, index_osm_parents = 7, index_osm_tags = 6, semantic_distance = 6, index_geom = 4, geom_distance = 0.25, index_loc_tokens = 3, confidence_tests = (1,2,3,4), geom_context = None, geom_cache = {} ) : """ calculate confidence values for a set of location matches originating from a concatenated set of geo_parse_lib.geoparse_token_set() results :param list list_loc: list of locations with geom information from geo_parse_lib.create_matched_location_list() :param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config() :param int index_token_start: index of matched token start position in list_loc :param int index_token_end: index of matched token end position in list_loc :param int index_osm_id: index of OSM ID in list_loc :param int index_osm_parents: index of OSM ID of super regions to this matches location in list_loc :param int index_osm_tags: index of OSM tags in list_loc :param int semantic_distance: number of tokens (left and right) to look for semantically nearby location checks e.g. 'London in UK' :param int index_geom: index of serialized OpenGIS geom in list_loc :param int geom_distance: distance for shapely distance check (in degrees) :param int index_loc_tokens: index of matched loc tokens :param int confidence_tests: confidence check tests to run when calculating a confidence value. 1 = token subsumption, 2 = nearby parent region, 3 = nearby locations and nearby geotag, 4 = general before specific :param dict geom_cache: cache of geom checks with distance and intersection results to avoid running the same shapely checks twice. this cache will be populated with any new geoms that are checked using shapely so might get large over time. e.g. dict{ strGeom : [ set(close_tuple_IDs), set(not_close_tuple_IDs), set(intersects_tuple_IDs), set(not_intersects_tuple_IDs) ] } :return: a list of confidence values (0..300) for each location in list_loc. locations with a common token can be ranked by confidence and the highest value taken. a confidence of 0 means the location should be rejected regardless. closeby locations scores 2+. super regions present in text scores 10+. geocontext intersects location scores 200+ and closeness scores 100+ :rtype: list """ # check args without defaults if not isinstance( list_loc, list ) : raise Exception( 'invalid list_loc' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # checks if semantic_distance < 1 : raise Exception( 'semantic distance < 1' ) if len(list_loc) == 0 : return [] # copy then append the index as we will sort it next and we dont want to lose the original order for result! listLocs = copy.deepcopy( list_loc ) listResult = [] nOriginalIndex = len(listLocs[0]) for nIndexCount in range(len(listLocs)) : # add master index to end of copy of the source list listLocs[nIndexCount].append( nIndexCount ) # 0 confidence for subsumed tokens is default listResult.append( 0 ) # debug """ bTEST = False for nLocIndex1 in range(len(listLocs)) : if u'scotland' in listLocs[nLocIndex1][3].lower() : bTEST = True if bTEST == True : for nLocIndex1 in range(len(listLocs)) : dict_geospatial_config['logger'].info('TEST0 = ' + str(listLocs[nLocIndex1][nIndexTokenStart]) + ':' + str(listLocs[nLocIndex1][nIndexTokenEnd]) + ' = ' + str(listLocs[nLocIndex1][3]) + ' = ' + str(listResult[ listLocs[nLocIndex1][nOriginalIndex] ]) ) """ # sort list by 1st phrase size and 2nd token position and 3rd length of way (big polygons are probably better than single points) # key = (1 + token end - token start) + 1.0/(2+token start) + 0.000001 * strGeom.count(',') if (not isinstance( listLocs[0][index_geom], str )) and (not isinstance( listLocs[0][index_geom], str )) : raise Exception( 'loc list geom index value is not of type <str> or <unicode>' ) listLocs = sorted( listLocs, key=lambda entry: (1 + entry[index_token_end] - entry[index_token_start]) + 1.0/(2+entry[index_token_start]) + 0.000001 * entry[index_geom].count(','), reverse=True ) # create geom index ONLY if its needed as shape work is slow dictGeomIndex = None # if we have a geom_context make it into a shape shapeGeomContext = None shapeGeomContextPoint = None if geom_context != None : shapeGeomContext = shapely.wkt.loads( geom_context ) shapeGeomContextPoint = shapeGeomContext.representative_point() # phase 1 - find valid phrases and avoid subsumption if 1 in confidence_tests : for nLocIndex1 in range(len(listLocs)) : # check index types for safety if not isinstance( listLocs[nLocIndex1][index_token_start], int ) : raise Exception( 'loc list token start index value is not of type <int>' ) if not isinstance( listLocs[nLocIndex1][index_token_end], int ) : raise Exception( 'loc list token end index value is not of type <int>' ) if (not isinstance( listLocs[nLocIndex1][index_osm_id], tuple )) and (not isinstance( listLocs[nLocIndex1][index_osm_id], list )) : raise Exception( 'loc list OSM ID array index value is not of type <tuple>' ) if (not isinstance( listLocs[nLocIndex1][index_osm_parents], tuple )) and (not isinstance( listLocs[nLocIndex1][index_osm_parents], list )) : raise Exception( 'loc list parent OSM ID array index value is not of type <tuple>' ) # get token range of this location nTokenStart = listLocs[nLocIndex1][index_token_start] nTokenEnd = listLocs[nLocIndex1][index_token_end] nGram = 1 + nTokenEnd - nTokenStart # calc initial confidence # reject subsumed tokens that have already been used for a higher gram phrase match # note: allow tokens to overlap if same gram as these could be perfectly valid matches # e.g. Russia, RU and Russia, Ohio, USA are 1 gram and will overlap (completely) => allow both to match with confidence 1 # i.e. subsumption test # note: this works because we have already sorted the list by phrase gram size so we get larger ones first bOverlap = False for nLocIndex2 in range(nLocIndex1) : # get token range of this previous location nTokenStartPrevious = listLocs[nLocIndex2][index_token_start] nTokenEndPrevious = listLocs[nLocIndex2][index_token_end] nGramPrevious = 1 + nTokenEndPrevious - nTokenStartPrevious if nGramPrevious > nGram : if (nTokenEnd >= nTokenStartPrevious) and (nTokenStart <= nTokenEndPrevious) : bOverlap = True break if bOverlap == False : # 1 confidence for valid phrases (we will count parents in second phase) listResult[ listLocs[nLocIndex1][nOriginalIndex] ] = 1 # debug """ for nLocIndex1 in range(len(listLocs)) : dict_geospatial_config['logger'].info('TEST1 = ' + str(listLocs[nLocIndex1][1]) + ' : ' + str(listLocs[nLocIndex1][index_osm_id]) + ':' + str(listLocs[nLocIndex1][2]) + ' = ' + str(listLocs[nLocIndex1][3]) + ' = ' + str(listResult[ listLocs[nLocIndex1][nOriginalIndex] ]) ) """ # phase 2 - parent check (worth 10) # if OSMID admim parents specified check for them and add conf for each one nearby # note: ignore geotags for this check as closeness to geotag is checked in phase 3 if 2 in confidence_tests : for nLocIndex1 in range(len(listLocs)) : nTokenStart = listLocs[nLocIndex1][index_token_start] nTokenEnd = listLocs[nLocIndex1][index_token_end] tupleOSMID1 = tuple( listLocs[nLocIndex1][index_osm_id] ) tupleLocPhrase1 = tuple( listLocs[nLocIndex1][index_loc_tokens] ) # dont check ourselves setOSMIDChecked = set([tupleOSMID1]) # dict_geospatial_config['logger'].info( 'PARENT CHECK1 ' + repr(tupleOSMID1) ) if listResult[ listLocs[nLocIndex1][nOriginalIndex] ] > 0 : nConf = listResult[ listLocs[nLocIndex1][nOriginalIndex] ] for nLocIndex2 in range(len(listLocs)) : nTokenStart2 = listLocs[nLocIndex2][index_token_start] nTokenEnd2 = listLocs[nLocIndex2][index_token_end] tupleOSMID2 = tuple( listLocs[nLocIndex2][index_osm_id] ) tupleLocPhrase2 = tuple( listLocs[nLocIndex2][index_loc_tokens] ) # do not allow use locs with the same name to act as parents # e.g. island groups like shetlands OR admin polygon & admin_centre node with same name if tupleLocPhrase1 == tupleLocPhrase2 : continue # dict_geospatial_config['logger'].info( 'PARENT CHECK2 ' + repr(tupleOSMID2) ) if (not tupleOSMID2 in setOSMIDChecked) and (listResult[ listLocs[nLocIndex2][nOriginalIndex] ] > 0): # dont check same OSMID many times (e.g. if it has many geoms) setOSMIDChecked.add( tupleOSMID2 ) # is this location within the semantic distance to check for parent? but not the same phrase bCheck = False if (nTokenStart2 > nTokenEnd) and (nTokenStart2 <= nTokenEnd + semantic_distance) : bCheck = True if (nTokenEnd2 < nTokenStart) and (nTokenEnd2 >= nTokenStart - semantic_distance) : bCheck = True # check all OSMID's of this possible parent against the declared parents in primary location if bCheck == True : setPrimaryOSMID = set( listLocs[nLocIndex1][index_osm_parents] ) setSecondaryOSMID = set( listLocs[nLocIndex2][index_osm_id] ) # if we have admin parents use them if len( setPrimaryOSMID & setSecondaryOSMID ) > 0 : nConf = nConf + 10 #dict_geospatial_config['logger'].info( 'PARENT CHECK osmid = ' + repr(tupleOSMID1) + ' conf = ' + str(nConf) ) listResult[ listLocs[nLocIndex1][nOriginalIndex] ] = nConf # debug """ for nLocIndex1 in range(len(listLocs)) : dict_geospatial_config['logger'].info('TEST2 = ' + str(listLocs[nLocIndex1][1]) + ' : ' + str(listLocs[nLocIndex1][index_osm_id]) + ':' + str(listLocs[nLocIndex1][2]) + ' = ' + str(listLocs[nLocIndex1][3]) + ' = ' + str(listResult[ listLocs[nLocIndex1][nOriginalIndex] ]) ) """ # phase 3 - check inter-loc closeness (geocontext intersect worth 100, nearby location worth 1) # check all loc geom # if loc2 is within a semantic distance and != same name (i.e. try to avoid liverpool, liverpool) # if loc2 is closeby ==> add confidence # if loc2 is without a textual ref # if loc2 is closeby OR intersects ==> add confidence # note: shapely distance() is slow so we use only compare represetnative distance() # which is fine for streets and buildings but inaccurate for large regions (e.g. Russia) # so we need BOTH intersects and closeby tests for non-textual refs # note: we do not have original phrase so can only check for common tag:name not actual text phrase if 3 in confidence_tests : # loop on all locs (1) for nLocIndex1 in range(len(listLocs)) : if listResult[ listLocs[nLocIndex1][nOriginalIndex] ] > 0 : # get existing conf value nConf = listResult[ listLocs[nLocIndex1][nOriginalIndex] ] tupleOSMID1 = tuple( listLocs[nLocIndex1][index_osm_id] ) nTokenEnd1 = listLocs[nLocIndex1][index_token_end] nTokenStart1 = listLocs[nLocIndex1][index_token_start] tupleLocPhrase1 = tuple( listLocs[nLocIndex1][index_loc_tokens] ) strGeom1 = listLocs[nLocIndex1][index_geom] # make sure this geom appears in cache if not strGeom1 in geom_cache : geom_cache[strGeom1] = [ set([]),set([]),set([]),set([]) ] # check geom_context (if we have one) for both intersection and distance # context_geom matches add 100 to confidence as this is a strong indication the loc is correct and should override others evidence if shapeGeomContext != None : if dictGeomIndex == None : # index location geoms using the OSMID array as a unique identifier to avoid calculating duplicate geoms # dictGeomIndex = dict <- calc_geom_index() = { tupleOSMID : [ ( shapely_geom_prepared, shapely_envelope, shapely_geom, shapely_representative_point, dict_OSM_tag, geom_serialized ), ... ] } # dictGeomResultsCache = { tupleOSMID : [ set(close_tuple_IDs), set(not_close_tuple_IDs), set(intersects_tuple_IDs), set(not_intersects_tuple_IDs), set(serialized_geom) ] } # note: only create it when its needed as its slow to do geom work dictGeomIndex = calc_geom_index( listLocs, index_geom = index_geom, index_id = index_osm_id, index_osm_tag = None ) # get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups) shapeLoc1 = None shapePoint1 = None for tupleGeomData in dictGeomIndex[tupleOSMID1] : if strGeom1 == tupleGeomData[5] : # use prepared geom for shapely match efficiency shapeLoc1 = tupleGeomData[0] # use representative point for distance() as using the full geom is much too slow # for complex regions like russia (e.g. 1 min calc time which does not scale to 1000's of calls) shapePoint1 = tupleGeomData[3] break if shapeLoc1 != None : # calc intersections between two shapes. if so add to confidence if (strGeom1.lower().startswith( 'polygon(' )) and (shapeLoc1.intersects( shapeGeomContext ) == True) : nConf = nConf + 200 # calc distance between two shapes and check if it is within a given closeness. if so add to confidence if shapePoint1.distance( shapeGeomContextPoint ) <= geom_distance : nConf = nConf + 100 # loop on all locs (2) for nLocIndex2 in range(len(listLocs)) : if (nLocIndex2 != nLocIndex1) and (nTokenStart1 != nTokenStart2) and (nTokenEnd1 != nTokenEnd2) : if listResult[ listLocs[nLocIndex2][nOriginalIndex] ] > 0 : tupleOSMID2 = tuple( listLocs[nLocIndex2][index_osm_id] ) nTokenStart2 = listLocs[nLocIndex2][index_token_start] nTokenEnd2 = listLocs[nLocIndex2][index_token_end] tupleLocPhrase2 = tuple( listLocs[nLocIndex2][index_loc_tokens] ) strGeom2 = listLocs[nLocIndex2][index_geom] # do not add confidence for spatially close locations with the same name # e.g. island groups like shetlands OR admin polygon & admin_centre node with same name if tupleLocPhrase1 == tupleLocPhrase2 : continue # make sure this geom appears in cache if not strGeom2 in geom_cache : geom_cache[strGeom2] = [ set([]),set([]),set([]),set([]) ] # is this location within the semantic distance to check for parent? (but not the same phrase) # note: always compare distance to non-text geoms as semantic distance has no meaning for these # and a location mention nearby a geotag will disambiguate that location mention nicely bCheckDistance = False bCheckIntersects = False if (nTokenStart2 > nTokenEnd1) and (nTokenStart2 <= nTokenEnd1 + semantic_distance) : bCheckDistance = True if (nTokenEnd2 < nTokenStart1) and (nTokenEnd2 >= nTokenStart1 - semantic_distance) : bCheckDistance = True # check if loc geom is actually the same OSMID as the target # - if so dont check as this would be self-confirming # note: use set intersection as locs can have multiple OSMID's (e.g. group of islands) setOSMID1 = set( listLocs[nLocIndex1][index_osm_id] ) setOSMID2 = set( listLocs[nLocIndex2][index_osm_id] ) if len( setOSMID1 & setOSMID2 ) > 0 : bCheckDistance = False bCheckIntersects = False continue # check parent lists # - if loc geom is a parent then dont check it as phase 2 subsumption test will have added confidence already (worth 10) # - if loc geom is a child then add confidence # note: this avoids a needless lookup (distance & intersection) if bCheckIntersects == True or bCheckDistance == True : setParentOSMID1 = set( listLocs[nLocIndex1][index_osm_parents] ) setParentOSMID2 = set( listLocs[nLocIndex2][index_osm_parents] ) if len( setParentOSMID1 ) > 0 : if len( setParentOSMID1 & setOSMID2 ) > 0 : bCheckDistance = False bCheckIntersects = False continue if len( setParentOSMID2 ) > 0 : if len( setParentOSMID2 & setOSMID1 ) > 0 : nConf = nConf + 1 # break so we only add 1 to conf for a closeness match break # check intersection if bCheckIntersects == True : # have we done this geom comparison before? if so reuse result # otherwise do a shape comparison to check for geographic subsumption listGeomResult = geom_cache[strGeom1] if tupleOSMID2 in listGeomResult[2] : nConf = nConf + 1 #dict_geospatial_config['logger'].info( 'GEO_INTERSECT (cached) = ' + repr( listLocs[nLocIndex1][nOriginalIndex] ) + ' close_to ' + repr( listLocs[nLocIndex2][nOriginalIndex] ) ) # break so we only add 1 to conf for a intersects match break elif tupleOSMID2 in listGeomResult[3] : #dict_geospatial_config['logger'].info( 'NOT GEO_INTERSECT (cached) = ' + repr( listLocs[nLocIndex1][nOriginalIndex] ) + ' close_to ' + repr( listLocs[nLocIndex2][nOriginalIndex] ) ) pass else : if dictGeomIndex == None : # index location geoms using the OSMID array as a unique identifier to avoid calculating duplicate geoms # dictGeomIndex = dict <- calc_geom_index() = { tupleOSMID : [ ( shapely_geom_prepared, shapely_envelope, shapely_geom, shapely_representative_point, dict_OSM_tag, geom_serialized ), ... ] } # dictGeomResultsCache = { tupleOSMID : [ set(close_tuple_IDs), set(not_close_tuple_IDs), set(intersects_tuple_IDs), set(not_intersects_tuple_IDs), set(serialized_geom) ] } # note: only create it when its needed as its slow to do geom work dictGeomIndex = calc_geom_index( listLocs, index_geom = index_geom, index_id = index_osm_id, index_osm_tag = None ) # get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups) shapeLoc1 = None shapePoint1 = None for tupleGeomData in dictGeomIndex[tupleOSMID1] : if strGeom1 == tupleGeomData[5] : # use prepared geom for shapely match efficiency shapeLoc1 = tupleGeomData[0] # use representative point for distance() as using the full geom is much too slow # for complex regions like russia (e.g. 1 min calc time which does not scale to 1000's of calls) shapePoint1 = tupleGeomData[3] break # get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups) shapeLoc2 = None shapePoint2 = None for tupleGeomData in dictGeomIndex[tupleOSMID2] : if strGeom2 == tupleGeomData[5] : # use normal geom (shapely prepared shape intersect function needs a normal shape to work on) shapeLoc2 = tupleGeomData[2] # use representative point for distance() as using the full geom is much too slow # for complex regions like russia (e.g. 1 min calc time which does not scale to 1000's of calls) shapePoint2 = tupleGeomData[3] break if (shapeLoc1 != None) and (shapeLoc2 != None) : # debug #dict_geospatial_config['logger'].info( 'TEST GEO 2 = ' + repr( [tupleOSMID1,tupleOSMID2] ) ) #dict_geospatial_config['logger'].info( 'TEST 3 = ' + repr( [setParentOSMID1,setOSMID2] ) ) #dict_geospatial_config['logger'].info( 'TEST 4 = ' + repr( [nTokenStart1,nTokenStart2] ) ) #dict_geospatial_config['logger'].info( 'phase3 shapely intersect check = ' + repr( [tupleOSMID1,tupleOSMID2] ) ) # calc intersections between two shapes. if so add to confidence if shapeLoc1.intersects( shapeLoc2 ) == True : nConf = nConf + 1 #dict_geospatial_config['logger'].info('GEO_INTERSECT = ' + repr( [listLocs[nLocIndex1][nOriginalIndex],listLocs[nLocIndex2][nOriginalIndex]] )) # remember loc1 -> loc2 listGeomResult[2].add( tupleOSMID2 ) geom_cache[strGeom1] = listGeomResult # remember loc2 -> loc1 listGeomResult = geom_cache[strGeom2] if not tupleOSMID1 in listGeomResult[2] : listGeomResult[2].add( tupleOSMID1 ) geom_cache[strGeom2] = listGeomResult # break so we only add 1 to conf for a closeness match break else : # dict_geospatial_config['logger'].info('NOT GEO_LOC = ' + repr( [listLocs[nLocIndex1][nOriginalIndex],listLocs[nLocIndex2][nOriginalIndex]] )) # remember NOT loc1 -> loc2 listGeomResult[3].add( tupleOSMID2 ) geom_cache[tupleOSMID1] = listGeomResult # remember NOT loc2 -> loc1 listGeomResult = geom_cache[strGeom2] if not tupleOSMID1 in listGeomResult[2] : listGeomResult[3].add( tupleOSMID1 ) geom_cache[strGeom2] = listGeomResult # check distance if bCheckDistance == True : # have we done this geom comparison before? if so reuse result # otherwise do a shape comparison to check for geographic subsumption listGeomResult = geom_cache[strGeom1] if tupleOSMID2 in listGeomResult[0] : nConf = nConf + 1 #dict_geospatial_config['logger'].info( 'GEO_LOC (cached) = ' + repr( listLocs[nLocIndex1][nOriginalIndex] ) + ' close_to ' + repr( listLocs[nLocIndex2][nOriginalIndex] ) ) # break so we only add 1 to conf for a intersects match break elif tupleOSMID2 in listGeomResult[1] : #dict_geospatial_config['logger'].info( 'NOT GEO_LOC (cached) = ' + repr( listLocs[nLocIndex1][nOriginalIndex] ) + ' close_to ' + repr( listLocs[nLocIndex2][nOriginalIndex] ) ) pass else : if dictGeomIndex == None : # index location geoms using the OSMID array as a unique identifier to avoid calculating duplicate geoms # dictGeomIndex = dict <- calc_geom_index() = { tupleOSMID : [ ( shapely_geom_prepared, shapely_envelope, shapely_geom, shapely_representative_point, dict_OSM_tag, geom_serialized ), ... ] } # dictGeomResultsCache = { tupleOSMID : [ set(close_tuple_IDs), set(not_close_tuple_IDs), set(intersects_tuple_IDs), set(not_intersects_tuple_IDs), set(serialized_geom) ] } # note: only create it when its needed as its slow to do geom work dictGeomIndex = calc_geom_index( listLocs, index_geom = index_geom, index_id = index_osm_id, index_osm_tag = None ) # get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups) shapeLoc1 = None shapePoint1 = None for tupleGeomData in dictGeomIndex[tupleOSMID1] : if strGeom1 == tupleGeomData[5] : # use prepared geom for shapely match efficiency shapeLoc1 = tupleGeomData[0] # use representative point for distance() as using the full geom is much too slow # for complex regions like russia (e.g. 1 min calc time which does not scale to 1000's of calls) shapePoint1 = tupleGeomData[3] break # get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups) shapeLoc2 = None shapePoint2 = None for tupleGeomData in dictGeomIndex[tupleOSMID2] : if strGeom2 == tupleGeomData[5] : # use normal geom (shapely prepared shape intersect function needs a normal shape to work on) shapeLoc2 = tupleGeomData[2] # use representative point for distance() as using the full geom is much too slow # for complex regions like russia (e.g. 1 min calc time which does not scale to 1000's of calls) shapePoint2 = tupleGeomData[3] break if (shapeLoc1 != None) and (shapeLoc2 != None) : # debug #dict_geospatial_config['logger'].info( 'TEST GEO 2 = ' + repr( [tupleOSMID1,tupleOSMID2] ) ) #dict_geospatial_config['logger'].info( 'TEST 3 = ' + repr( [setParentOSMID1,setOSMID2] ) ) #dict_geospatial_config['logger'].info( 'TEST 4 = ' + repr( [nTokenStart1,nTokenStart2] ) ) #dict_geospatial_config['logger'].info( 'phase3 shapely distance check = ' + repr( [tupleOSMID1,tupleOSMID2] ) ) # calc distance between two shapes and check if it is within a # given closeness. if so add to confidence if shapePoint1.distance( shapePoint2 ) <= geom_distance : nConf = nConf + 1 #dict_geospatial_config['logger'].info('GEO_LOC = ' + repr( [listLocs[nLocIndex1][nOriginalIndex],listLocs[nLocIndex2][nOriginalIndex]] )) # remember loc1 -> loc2 listGeomResult[0].add( tupleOSMID2 ) geom_cache[strGeom1] = listGeomResult # remember loc2 -> loc1 listGeomResult = geom_cache[strGeom2] if not tupleOSMID1 in listGeomResult[0] : listGeomResult[0].add( tupleOSMID1 ) geom_cache[strGeom2] = listGeomResult # break so we only add 1 to conf for a closeness match break else : # dict_geospatial_config['logger'].info('NOT GEO_LOC = ' + repr( [listLocs[nLocIndex1][nOriginalIndex],listLocs[nLocIndex2][nOriginalIndex]] )) # remember NOT loc1 -> loc2 listGeomResult[1].add( tupleOSMID2 ) geom_cache[strGeom1] = listGeomResult # remember NOT loc2 -> loc1 listGeomResult = geom_cache[strGeom2] if not tupleOSMID1 in listGeomResult[1] : listGeomResult[1].add( tupleOSMID1 ) geom_cache[strGeom2] = listGeomResult # assert new conf value listResult[ listLocs[nLocIndex1][nOriginalIndex] ] = nConf # debug """ for nLocIndex1 in range(len(listLocs)) : dict_geospatial_config['logger'].info('TEST3 = ' + str(listLocs[nLocIndex1][1]) + ' : ' + str(listLocs[nLocIndex1][index_osm_id]) + ':' + str(listLocs[nLocIndex1][2]) + ' = ' + str(listLocs[nLocIndex1][3]) + ' = ' + str(listResult[ listLocs[nLocIndex1][nOriginalIndex] ]) ) """ # phase 4 - admin level disambiguation # for contested tokens choose the location(s) with the lowest OSM admin level # and discount no admin level choices (buildings, roads) and points (e.g. admin centres which should have a polygon option available anyway) # so 'London' city in UK is preferred to 'London' province in Canada if 4 in confidence_tests : for nLocIndex1 in range(len(listLocs)) : nTokenStart1 = listLocs[nLocIndex1][index_token_start] nTokenEnd1 = listLocs[nLocIndex1][index_token_end] if not isinstance( listLocs[nLocIndex1][index_osm_tags], dict ) : raise Exception( 'OSM tag array not type <dict>' ) dictOSMTags1 = listLocs[nLocIndex1][index_osm_tags] tupleOSMID1 = tuple( listLocs[nLocIndex1][index_osm_id] ) strGeom1 = listLocs[nLocIndex1][index_geom] # if location has an admin level then if we find a location with a lower level set current loc confidence to 0 # note: OSM admin level is from 1 to 10, so use 12 for no value to always fail in comparison # note: some points have an admin value (e.g. admin centre linked to a cities way) so discount them also as we want the polygon to be chosen over the admin centre point if 'admin level' in dictOSMTags1 : nAdminLevel1 = int( dictOSMTags1['admin level'] ) if strGeom1.lower().startswith( 'point(' ) : nAdminLevel1 = 11 else : nAdminLevel1 = 12 # get confidence level of current token nConf1 = listResult[ listLocs[nLocIndex1][nOriginalIndex] ] if nConf1 > 0 : for nLocIndex2 in range(len(listLocs)) : nConf2 = listResult[ listLocs[nLocIndex2][nOriginalIndex] ] nTokenStart2 = listLocs[nLocIndex2][index_token_start] nTokenEnd2 = listLocs[nLocIndex2][index_token_end] tupleOSMID2 = tuple( listLocs[nLocIndex2][index_osm_id] ) if not isinstance( listLocs[nLocIndex2][index_osm_tags], dict ) : raise Exception( 'OSM tag array not type <dict>' ) dictOSMTags2 = listLocs[nLocIndex2][index_osm_tags] strGeom2 = listLocs[nLocIndex2][index_geom] # is this loc for the same token? is confdience level the same? # if so use admin_level to disambiguate and zero the confidence of the higher admin level location if (tupleOSMID1 != tupleOSMID2) and (nTokenStart1 == nTokenStart2) and (nTokenEnd1 == nTokenEnd2) and (nConf1 == nConf2) : if 'admin level' in dictOSMTags2 : nAdminLevel2 = int( dictOSMTags2['admin level'] ) if strGeom2.lower().startswith( 'point(' ) : nAdminLevel2 = 11 else : nAdminLevel2 = 12 # if admin level of this other loc < admin level of current loc then set the current loc confidence to 0 as we should prefer other loc if nAdminLevel2 < nAdminLevel1 : #DEBUG #dict_geospatial_config['logger'].info( 'DEBUG Phase 4 : ' + repr(tupleOSMID2) + ' admin level ' + str(nAdminLevel2) + ' > ' + repr(tupleOSMID1) + ' admin level ' + str(nAdminLevel1) ) nConf1 = 0 break # update confidence level of current token (i.e. 0 if we have found a location using same token but with a higher admin level) listResult[ listLocs[nLocIndex1][nOriginalIndex] ] = nConf1 # debug """ for nLocIndex1 in range(len(listLocs)) : dict_geospatial_config['logger'].info('TEST4 = ' + str(listLocs[nLocIndex1][1]) + ' : ' + str(listLocs[nLocIndex1][index_osm_id]) + ':' + str(listLocs[nLocIndex1][2]) + ' = ' + str(listLocs[nLocIndex1][3]) + ' = ' + str(listResult[ listLocs[nLocIndex1][nOriginalIndex] ]) ) """ # tidy up if dictGeomIndex != None : del dictGeomIndex # all done return listResult
[docs]def filter_matches_by_confidence( list_loc, dict_geospatial_config, index_token_start = 1, index_token_end = 2, index_osm_id = 5, index_osm_parents = 7, index_osm_tags = 6, semantic_distance = 6, index_geom = 4, geom_distance = 0.25, index_loc_tokens = 3, confidence_tests = (1,2,3,4), geom_context = None, geom_cache = {} ) : """ filter a list of matches by match confidence using geo_parse_lib.calc_location_confidence() scores. only the highest ranked locations for each token will be kept, with the others removed from the list :param list list_loc: list of locations with geom information from geo_parse_lib.create_matched_location_list(). this list will be filtered with rows removed that rank low on match confidence. :param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config() :param int index_token_start: index of matched token start position in list_loc :param int index_token_end: index of matched token end position in list_loc :param int index_osm_id: index of OSM ID in list_loc :param int index_osm_parents: index of OSM ID of super regions to this matches location in list_loc :param int index_osm_tags: index of OSM tags in list_loc :param int semantic_distance: number of tokens (left and right) to look for semantically nearby location checks e.g. 'London in UK' :param int index_geom: index of serialized OpenGIS geom in list_loc :param int geom_distance: distance for shapely distance check (in degrees) :param int index_loc_tokens: index of matched loc tokens :param int confidence_tests: confidence check tests to run when calculating a confidence value. 1 = token subsumption, 2 = nearby parent region, 3 = nearby locations and nearby geotag, 4 = general before specific :param dict geom_cache: cache of geom checks with distance and intersection results to avoid running the same shapely checks twice. this cache will be populated with any new geoms that are checked using shapely so might get large over time. e.g. dict{ strGeom : [ set(close_tuple_IDs), set(not_close_tuple_IDs), set(intersects_tuple_IDs), set(not_intersects_tuple_IDs) ] } :return: a list of confidence values (0..20) for each location in list_loc. locations with a common token can be ranked by confidence and the highest value taken. a confidence of 0 means the location should be rejected regardless. semantically close locations provide scores 1+. geotags inside locations provide scores 10+. :rtype: list """ # check args without defaults if not isinstance( list_loc, list ) : raise Exception( 'invalid list_loc' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # remove exact duplicates (token range, OSM id, geom) for nMatchIndex1 in range(len(list_loc)) : listEntry1 = list_loc[nMatchIndex1] if listEntry1 != None : for nMatchIndex2 in range(nMatchIndex1+1,len(list_loc)) : listEntry2 = list_loc[nMatchIndex2] if listEntry2 != None : if (listEntry1[index_token_start] == listEntry2[index_token_start]) and (listEntry1[index_token_end] == listEntry2[index_token_end]) and (listEntry1[index_osm_id] == listEntry2[index_osm_id]) and (listEntry1[index_geom] == listEntry2[index_geom]) : # void duplicate (inc geom) list_loc[nMatchIndex2] = None # delete voided rows while list_loc.count( None ) : list_loc.remove( None ) # calc confidence values for each location match listLocConfidence = calc_location_confidence( list_loc = list_loc, index_token_start = index_token_start, index_token_end = index_token_end, index_osm_id = index_osm_id, index_osm_parents = index_osm_parents, index_osm_tags = index_osm_tags, semantic_distance = semantic_distance, dict_geospatial_config = dict_geospatial_config, index_geom = index_geom, geom_distance = geom_distance, index_loc_tokens = index_loc_tokens, confidence_tests = confidence_tests, geom_context = geom_context, geom_cache = geom_cache ) # remove location matches that have a 0 confidence as these are incidental matches of phrases gram tokens within higher gram phrases # and for the rest append the conf value to the match list data # listLocInfo = [ <source_loc_id>, <token_start>, <token_end>, loc_tokens, geom, (<osm_id>, ...), {<osm_tag>:<value>}*N_tags, (<osm_id_parent>, ...), confidence ]*N_locs nIndexConf = -1 for nMatchIndex1 in range(len(list_loc)) : nConf = listLocConfidence[ nMatchIndex1 ] if nConf == 0 : list_loc[nMatchIndex1] = None else : list_loc[nMatchIndex1].append( nConf ) nIndexConf = len( list_loc[nMatchIndex1] ) - 1 while list_loc.count( None ) : list_loc.remove( None ) # where we have multiple loc matches for same phrase remove all matches that do not have the highest confidence value # e.g. Donetsk, Ukraine + Donetsk ,Russia ==> if Ukraine conf higher lose Russia location # (a) make index of phrases (b) calc top conf for each phrase (c) remove low conf phrase matches dictPhrase = {} for nMatchIndex1 in range(len(list_loc)) : # tuplePhrase = ( <token_start>, <token_end> ) tuplePhrase = ( list_loc[nMatchIndex1][index_token_start], list_loc[nMatchIndex1][index_token_end] ) if not tuplePhrase in dictPhrase : dictPhrase[ tuplePhrase ] = [nMatchIndex1] else : dictPhrase[ tuplePhrase ].append( nMatchIndex1 ) for tuplePhrase1 in dictPhrase : # get max nConfMax = 0 for nMatchIndex1 in dictPhrase[ tuplePhrase1 ] : if nConfMax < list_loc[nMatchIndex1][nIndexConf] : nConfMax = list_loc[nMatchIndex1][nIndexConf] for nMatchIndex1 in dictPhrase[ tuplePhrase1 ] : # void non-max if list_loc[nMatchIndex1][nIndexConf] < nConfMax : list_loc[nMatchIndex1][nIndexConf] = 0 # remove location matches that have a 0 confidence (again) for nMatchIndex1 in range(len(list_loc)) : nConf = list_loc[nMatchIndex1][nIndexConf] if nConf == 0 : list_loc[nMatchIndex1] = None while list_loc.count( None ) : list_loc.remove( None )
[docs]def filter_matches_by_geom_area( list_loc, dict_geospatial_config, index_token_start = 1, index_token_end = 2, index_osm_id = 5, index_geom = 4, same_osmid_only = False ) : """ filter a list of matches to favour locations with the largest area (e.g. liverpool city border > liverpool admin centre point, liverpool city in UK > liverpool suburb in AU). this is helpful to choose a single match from a list of matches with same confidence as nomrally people are referring to the larger more populated area :param list list_loc: list of locations with geom information from geo_parse_lib.create_matched_location_list(). this list will be filtered with rows removed that do not have parents in the region of interest :param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config() :param int index_token_start: index of matched token start position in list_loc :param int index_token_end: index of matched token end position in list_loc :param int index_osm_id: index of OSM ID in list_loc :param int index_geom: index of serialized OpenGIS geom in list_loc :param bool same_osmid_only: if True limit loc removal to same OSMIDs i.e. remove smaller geoms for same OSMID if several geoms matched (e.g. admin nodes and city polygons or several island polygons) """ # check args without defaults if not isinstance( list_loc, list ) : raise Exception( 'invalid list_loc' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) dictGeomIndex = None # remove any location matches with same tokens and OSMID but which have a smaller geom for nIndexLoc1 in range(len(list_loc)) : if list_loc[nIndexLoc1] != None : tupleOSMID1 = list_loc[nIndexLoc1][index_osm_id] tuplePhrase1 = ( list_loc[nIndexLoc1][index_token_start], list_loc[nIndexLoc1][index_token_end] ) strGeom1 = list_loc[nIndexLoc1][index_geom] for nIndexLoc2 in range(nIndexLoc1,len(list_loc)) : if list_loc[nIndexLoc2] != None : tupleOSMID2 = list_loc[nIndexLoc2][index_osm_id] strGeom2 = list_loc[nIndexLoc2][index_geom] tuplePhrase2 = ( list_loc[nIndexLoc2][index_token_start], list_loc[nIndexLoc2][index_token_end] ) if tuplePhrase1 == tuplePhrase2 : if (same_osmid_only == False) or (tupleOSMID1 == tupleOSMID2) : # index location geoms using the OSMID array as a unique identifier to calculate shapes # dictGeomIndex = dict <- calc_geom_index() = { tupleOSMID : [ ( shapely_geom_prepared, shapely_envelope, shapely_geom, shapely_representative_point, dict_OSM_tag, geom_serialized ), ... ] } # note: only create it when its needed to as its slow if dictGeomIndex == None : dictGeomIndex = calc_geom_index( list_loc, index_geom = index_geom, index_id = index_osm_id, index_osm_tag = None ) # get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups) shapeLoc1 = None for tupleGeomData in dictGeomIndex[tupleOSMID1] : if strGeom1 == tupleGeomData[5] : shapeLoc1 = tupleGeomData[2] break # get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups) shapeLoc2 = None for tupleGeomData in dictGeomIndex[tupleOSMID2] : if strGeom2 == tupleGeomData[5] : shapeLoc2 = tupleGeomData[2] break # if no shape available always use the option with a shape if shapeLoc1 == None : list_loc[nIndexLoc1] = None break if shapeLoc2 == None : list_loc[nIndexLoc2] = None break # note: points and lines have a zero area if shapeLoc2.area > shapeLoc1.area : # loc1 has a smaller area so remove it #dict_geospatial_config['logger'].info( 'AREA REJECT ' + str(tupleOSMID1) + ' : ' + str(tuplePhrase1) ) list_loc[nIndexLoc1] = None break while list_loc.count( None ) : list_loc.remove( None ) if dictGeomIndex != None : del dictGeomIndex # all done return
[docs]def filter_matches_by_region_of_interest( list_loc, list_regions_of_interest, dict_geospatial_config, index_osm_parents = 7 ) : """ filter a list of matches by region of interest. all locations who do not have a parent in the region of interest list will be removed from the list :param list list_loc: list of locations with geom information from geo_parse_lib.create_matched_location_list(). this list will be filtered with rows removed that do not have parents in the region of interest :param list list_regions_of_interest: list of OSM IDs for regions of interest :param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config() :param int index_osm_parents: index of OSM ID of super regions to this matches location in list_loc """ # check args without defaults if not isinstance( list_loc, list ) : raise Exception( 'invalid list_loc' ) if not isinstance( list_regions_of_interest, list ) : raise Exception( 'invalid list_regions_of_interest' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # nothing to do? if len(list_regions_of_interest) == 0 : raise Exception( 'empty list_regions_of_interest' ) # remove location matches that do not have this parent in thier super region list for nIndexLoc in range(len(list_loc)) : if list_loc[nIndexLoc] != None : bFound = False for nIndexParentID in range(len(list_regions_of_interest)) : nOSMIDParent = list_regions_of_interest[nIndexParentID] if nOSMIDParent in list_loc[nIndexLoc][index_osm_parents] : bFound = True break if bFound == False : list_loc[nIndexLoc] = None while list_loc.count( None ) : list_loc.remove( None ) # all done return
[docs]def calc_multilingual_osm_name_set( dict_osm_tags, dict_geospatial_config ) : """ return a list of name variants from the OSM tag set for a location. this will include the name, alternative and short names, abreviations and languages variants :param dict dict_osm_tags: OSM tags for this location :param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config() :return: list of name variants for this location :rtype: list """ # check args without defaults if not isinstance( dict_osm_tags, dict ) : raise Exception( 'invalid dict_osm_tags' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # init listResult = [] # compile a list of tags to check # deliberately put the native name last as we would like to get the first language on the supported lang list first for human readability listTagsToCheck = [] for strLangCode in dict_geospatial_config['lang_codes'] : listTagsToCheck.extend( [ 'name:' + strLangCode, 'alt name:' + strLangCode, 'old name:' + strLangCode ] ) listTagsToCheck.extend( [ 'name', 'ref', 'loc ref', 'nat ref', 'old ref', 'reg ref', 'ISO3166-1', 'ISO3166-1:alpha2', 'ISO3166-1:alpha3' ] ) listTagsToCheck.extend( [ 'alt name', 'int name', 'loc name', 'nat name', 'old name', 'reg name', 'short name', 'name:abbreviation', 'name:simple', 'sorting name' ] ) # check for OSM reference tags and add them (avoid duplicates) for strTag in listTagsToCheck : if strTag in dict_osm_tags : if not dict_osm_tags[ strTag ] in listResult : listResult.append( dict_osm_tags[ strTag ] ) # return the list return listResult
[docs]def calc_best_osm_name( target_lang, dict_osm_tags, dict_geospatial_config ) : """ return a location name in a target language or best next alternative. the default name is the name in the native language of the region. :param str target_lang: language code of preference for name :param dict dict_osm_tags: OSM tags for this location :param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config() :return: location name :rtype: unicode """ # check args without defaults if (not isinstance( target_lang, str )) and (not isinstance( target_lang, str )) : raise Exception( 'invalid target_lang' ) if not isinstance( dict_osm_tags, dict ) : raise Exception( 'invalid dict_osm_tags' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # check for names in target language if 'name:' + target_lang in dict_osm_tags : return dict_osm_tags[ 'name:' + target_lang ] if 'alt name:' + target_lang in dict_osm_tags : return dict_osm_tags[ 'alt name:' + target_lang ] if 'old name:' + target_lang in dict_osm_tags : return dict_osm_tags[ 'old name:' + target_lang ] # sometimes a sorting name is created (english) if target_lang == 'en' : if 'sorting name' in dict_osm_tags : return dict_osm_tags[ 'sorting name' ] # otherwise default is to go for the basic name in the native language of the location if 'name' in dict_osm_tags : return dict_osm_tags[ 'name' ] # fail! this should be impossible as name is always there raise Exception( 'location found without a sensible name : ' + repr(dict_osm_tags) )
[docs]def calc_inverted_index( list_data, dict_geospatial_config, index_phrase = 6, index_id = 2 ) : """ compile an inverted index from a list of arbirary data where one column is a phrase string. the inverted index key is the phrase as a tokenized tuple e.g. ('new','york'). the inverted index value is an ID value linking back to the original list of data e.g. OSM ID tuple or just a row index. | note: the default index values are preset for the list of cached locations from geo_preprocess_lib.cache_preprocessed_locations() :param list list_data: list of data to create an inverted index for e.g. result of geo_preprocess_lib.cache_preprocessed_locations() :param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config() :param int index_phrase: column index in list_data of phrase text to use as a key for inverted index (str/unicode OR list/tuple of str/unicode) :param int index_id: column index in list_data of an ID that the inverted index will point to (str/unicode/list/tuple). A value of None will mean the list_data row index is used as an ID :return: inverted index where key = phrase as a tuple, value = ID value for original data list :rtype: dict """ # check args without defaults if not isinstance( list_data, list ) : raise Exception( 'invalid list_data' ) if not isinstance( dict_geospatial_config, dict ) : raise Exception( 'invalid dict_geospatial_config' ) # anything to do ? if len(list_data) == 0 : return {} # check index OK if len(list_data[0]) <= index_phrase : raise Exception( 'phrase index >= len(source data)' ) if (index_id != None) and (len(list_data[0]) <= index_id) : raise Exception( 'ID index >= len(source data)' ) dictInvertedIndex = {} for nDocIndex in range(len(list_data)) : # get source phrase objPhrase = list_data[nDocIndex][index_phrase] # make a list of phrases to add for this document listPhrase = [] if isinstance( objPhrase, list) : listPhrase = objPhrase elif isinstance( objPhrase, tuple) : listPhrase = list( objPhrase ) elif isinstance( objPhrase, str) : listPhrase = [objPhrase] elif isinstance( objPhrase, str) : listPhrase = [objPhrase] else : raise Exception( 'object type of phrase not list,tuple,str,unicode' ) # add each phrase for strPhrase in listPhrase : # calc a ngram token for this phrase tuplePhrase = tuple( soton_corenlppy.common_parse_lib.tokenize_sentence( str(strPhrase), dict_geospatial_config ) ) # add each token to the inverted index using the document index as value objIndex = nDocIndex if index_id != None : objIndex = list_data[nDocIndex][index_id] if tuplePhrase in dictInvertedIndex : dictInvertedIndex[ tuplePhrase ].add( objIndex ) else : dictInvertedIndex[ tuplePhrase ] = set( [objIndex] ) # return inverted index return dictInvertedIndex
[docs]def calc_osmid_lookup( cached_locations ) : """ create an index of osmid to row indexes in the cached_locations :param dict cached_locations: list of cached locations from geo_preprocess_lib.cache_preprocessed_locations() :return: lookup table mapping an osmid to a set of rows in the cached locations (osmids can many entries each with a different geom such as island groups) :rtype: dict """ # check args without defaults if not isinstance( cached_locations, list ) : raise Exception( 'invalid cached_locations' ) osmid_lookup = {} for nIndexLoc in range(len(cached_locations)) : tupleID = tuple( cached_locations[nIndexLoc][2] ) if not tupleID in osmid_lookup : osmid_lookup[tupleID] = set([]) osmid_lookup[ tupleID ].add( nIndexLoc ) return osmid_lookup