# !/usr/bin/env python
# -*- coding: utf-8 -*-
"""
..
/////////////////////////////////////////////////////////////////////////
//
// (c) Copyright University of Southampton, 2012
//
// Copyright in this software belongs to University of Southampton,
// Highfield, Southampton, SO17 1BJ, United Kingdom
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Stuart E. Middleton
// Created Date : 2014/04/15
// Created for Project: REVEAL
//
/////////////////////////////////////////////////////////////////////////
//
// Dependancies: Source code derived from original location_extraction python code in TRIDEC
//
/////////////////////////////////////////////////////////////////////////
'''
Geoparsing is based on named entity matching against OpenStreetMap (OSM) locations. All locations with names that match tokens will be selected from a target text sentence. This will result in a set of OSM locations, all with a common name or name variant, for each token in the text. Geoparsing included the following features:
* *token expansion* using location name variants (i.e. OSM multi-lingual names, short names and acronyms)
* *token expansion* using location type variants (e.g. street, st.)
* *token filtering* of single token location names against WordNet (non-nouns), language specific stoplists and peoples first names (nltk.corpus.names.words()) to reduce false positive matches
* *prefix checking* when matching in case a first name prefixes a location token(s) to avoid matching peoples full names as locations (e.g. Victoria Derbyshire != Derbyshire)
Location disambiguation is the process of choosing which of a set of possible OSM locations, all with the same name, is the best match. Location disambiguation is based on an evidential approach, with evidential features detailed below in order of importance:
* *token subsumption*, rejecting smaller phrases over larger ones (e.g. 'New York' will prefer [New York, USA] to [York, UK])
* *nearby parent region*, preferring locations with a parent region also appearing within a semantic distance (e.g. 'New York in USA' will prefer [New York, USA] to [New York, BO, Sierra Leone])
* *nearby locations*, perferring locations with closeby or overlapping locations within a semantic distance (e.g. 'London St and Commercial Road' will select from road name choices with the same name based on spatial proximity)
* *nearby geotag*, perferring locations that are closeby or overlapping a geotag
* *general before specific*, rejecting locations with a higher admin level (or no admin level at all) compared to locations with a lower admin level (e.g. 'New York' will prefer [New York, USA] to [New York, BO, Sierra Leone]
Currently the following languages are supported:
* English, French, German, Italian, Portuguese, Russian, Ukrainian
* All other languages will work but there will be no language specific token expansion available
This geoparsing algorithm uses a large memory footprint, proportional to the number of cached locations, to maximize matching speed. It can be naively parallelized, with multiple geoparse processes loaded with different sets of locations and the geoparse results aggregated in a last process where location disambiguation is applied. This approach has been validated across an APACHE Storm cluster.
"""
# Performance data
# - geo parse [1 process]
# + 10,000 tweets with 1% geotag = 1720s = 5.8 items / sec
# + 3,800 instagram each with geotag = 6702s = 0.56 items / sec
# - geo aggregate [1 process]
# + 10,000 tweets with 1% geotag = 1642s = 6.0 items / sec
# + 3,800 instagram each with geotag = 955s = 3.7 items / sec
import os, re, sys, copy, collections, codecs, string, configparser, traceback, datetime, time, math
import nltk, nltk.stem.porter, nltk.corpus, numpy, shapely, shapely.speedups, shapely.prepared, shapely.wkt, shapely.geometry
from nltk.util import ngrams
from nltk.corpus import wordnet
import soton_corenlppy
import pkg_resources
# enumeration for OSM types generated by calc_OSM_type()
list_osm_types = ['transport','building','admin','other']
[docs]def get_geoparse_config( **kwargs ) :
"""
return a geospatial config object for this specific set of languages. the config object contains an instantiated NLTK stemmer, tokenizer and settings tailored for the chosen language set. all available language specific corpus will be read into memory, such as street name variants.
geoparse config settings are below:
* *lower_tokens* = True, since locations are not alweays referenced in text as capitalized Proper Nouns (overrides variable keyword args)
* *building_types* = dict, containing building type name variants loaded from each selected language's corpus file
* *street_types* = dict, containing street type name variants loaded from each selected language's corpus file
* *admin_types* = dict, containing admin region type name variants loaded from each selected language's corpus file
* *gazeteers* = dict, containing local gazateer name variants not provided in the OSM database for specific OSM IDs
* *use_wordnet* = True, remove 1 token location names that appear in wordnet with non location meanings
| note: for a list of default config settings see soton_corenlppy.common_parse_lib.get_common_config()
| note: a config object approach is used, as opposed to a global variable, to allow geo_parse_lib functions to work in a multi-threaded environment
:param kwargs: variable argument to override any default config values
:return: configuration settings to be used by all geo_parse_lib functions
:rtype: dict
"""
dictArgs = copy.copy( kwargs )
# default corpus directory is where the python lib package has been installed to
if not 'corpus_dir' in dictArgs :
if pkg_resources.resource_exists( __name__, 'geo_parse_lib.py' ) :
# if run as an installed python lib
strCorpusDir = os.path.dirname( pkg_resources.resource_filename( __name__, 'geo_parse_lib.py' ) )
else :
# if run as a standalone file in a dir
strCorpusDir = os.path.dirname( __file__ )
dictArgs['corpus_dir'] = strCorpusDir
# always use lower case for geoparse work as microblog references to locations do not follow nice camel case
# note: do this before soton_corenlppy.common_parse_lib.get_common_config() because we want things like stopwords and names to be lowercase for subsequent matching
dictArgs['lower_tokens'] = True
# convert 's to s to make entity matching easier
dictArgs['apostrophe_handling'] = 'strip'
# setup whitespace and punctuation for geoparse work (unless caller has provided something else in which case this default will be overwridden)
if not 'whitespace' in dictArgs :
dictArgs['whitespace'] = '[]"\u201a\u201b\u201c\u201d()'
if not 'punctuation' in dictArgs :
dictArgs['punctuation'] = """,;\/:+-#~&*=!?""",
# setup common values
dict_geospatial_config = soton_corenlppy.common_parse_lib.get_common_config( **dictArgs )
# check single word place names against wordnet to avoid common words mis-matching
if not 'use_wordnet' in dictArgs :
dict_geospatial_config['use_wordnet'] = True
else :
dict_geospatial_config['use_wordnet'] = dictArgs['use_wordnet']
# load local language specific geospatial stoplist
# created by ITINNO for use with location name tokens
listStoplist = dict_geospatial_config['stoplist']
for strCode in dictArgs['lang_codes'] :
strStoplistFile = strCorpusDir + os.sep + 'corpus-geo-stoplist-' + strCode + '.txt'
if os.path.isfile(strStoplistFile) :
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( 'loading stoplist from ' + strStoplistFile )
readHandle = codecs.open( strStoplistFile, 'r', 'utf-8' )
for line in readHandle :
# remove newline at end (might not have one if last line)
line = line.rstrip('\n')
line = line.rstrip('\r')
# remove UTF8 byte-order mark at start of file (added for UTF encoded files to indicate if its UTF-8, UTF-16 etc)
line = line.lstrip( '\ufeff' )
# ignore comments in stoplist file
if (len(line) > 1) and (not line.startswith('#')) :
strTextClean = soton_corenlppy.common_parse_lib.clean_text( line, dict_geospatial_config )
if len(strTextClean) > 0 :
if not strTextClean in listStoplist :
listStoplist.append( strTextClean )
else :
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( 'missing stoplist file ' + strStoplistFile + ' (not loaded)' )
dict_geospatial_config['stoplist'] = listStoplist
# load whitelist of good names to avoid wordnet rejection
listWhitelist = []
strWhitelistFile = strCorpusDir + os.sep + 'corpus-geo-whitelist.txt'
if os.path.isfile(strWhitelistFile) :
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( 'loading whitelist from ' + strWhitelistFile )
readHandle = codecs.open( strWhitelistFile, 'r', 'utf-8' )
for line in readHandle :
# remove newline at end (might not have one if last line)
line = line.rstrip('\n')
line = line.rstrip('\r')
# remove UTF8 byte-order mark at start of file (added for UTF encoded files to indicate if its UTF-8, UTF-16 etc)
line = line.lstrip( '\ufeff' )
# ignore comments in stoplist file
if (len(line) > 1) and (not line.startswith('#')) :
strTextClean = soton_corenlppy.common_parse_lib.clean_text( line, dict_geospatial_config )
if len(strTextClean) > 0 :
if not strTextClean in listWhitelist :
listWhitelist.append( strTextClean )
dict_geospatial_config['whitelist'] = listWhitelist
# load blacklist of bad names to avoid OSM names that match very common words
listBlacklist = []
strBlacklistFile = strCorpusDir + os.sep + 'corpus-geo-blacklist.txt'
if os.path.isfile(strBlacklistFile) :
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( 'loading blacklist from ' + strBlacklistFile )
readHandle = codecs.open( strBlacklistFile, 'r', 'utf-8' )
for line in readHandle :
# remove newline at end (might not have one if last line)
line = line.rstrip('\n')
line = line.rstrip('\r')
# remove UTF8 byte-order mark at start of file (added for UTF encoded files to indicate if its UTF-8, UTF-16 etc)
line = line.lstrip( '\ufeff' )
# ignore comments in stoplist file
if (len(line) > 1) and (not line.startswith('#')) :
strTextClean = soton_corenlppy.common_parse_lib.clean_text( line, dict_geospatial_config )
if len(strTextClean) > 0 :
if not strTextClean in listBlacklist :
listBlacklist.append( strTextClean )
dict_geospatial_config['blacklist'] = listBlacklist
#
# load all language specific location_type corpus files
# to get the building, street and admin type information (stopwords, prefix/suffix names)
#
dictBuildingTypes = { 'title' : [], 'type' : [] }
dictStreetTypes = { 'title' : [], 'type' : [] }
dictAdminTypes = { 'title' : [], 'type' : [] }
for strCode in dictArgs['lang_codes'] :
# language specific building types
strCorpusFile = strCorpusDir + os.sep + 'corpus-buildingtype-' + strCode + '.txt'
if os.path.isfile(strCorpusFile) :
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( 'loading building types from ' + strCorpusFile )
listTypeInfo = read_location_type_corpus( strCorpusFile, dict_geospatial_config )
dictBuildingTypes['title'].extend( listTypeInfo[0] )
dictBuildingTypes['type'].extend( listTypeInfo[1] )
#
# remove from name set all location title prefixes
# these will otherwise be assumed to be part of a name (e.g. west = title prefix, west = boys name, west london is OK but joe london is not)
#
for listTitles in listTypeInfo[0] :
for tupleEntry in listTitles :
if tupleEntry[0] != 'suffix' :
if tupleEntry[-1] in dict_geospatial_config['first_names'] :
dict_geospatial_config['first_names'].remove( tupleEntry[-1] )
# language specific street types
strCorpusFile = strCorpusDir + os.sep + 'corpus-streettype-' + strCode + '.txt'
if os.path.isfile(strCorpusFile) :
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( 'loading street types from ' + strCorpusFile )
listTypeInfo = read_location_type_corpus( strCorpusFile, dict_geospatial_config )
dictStreetTypes['title'].extend( listTypeInfo[0] )
dictStreetTypes['type'].extend( listTypeInfo[1] )
#
# remove from name set all location title prefixes
# these will otherwise be assumed to be part of a name (e.g. west = title prefix, west = boys name, west london is OK but joe london is not)
#
for listTitles in listTypeInfo[0] :
for tupleEntry in listTitles :
if tupleEntry[0] != 'suffix' :
if tupleEntry[-1] in dict_geospatial_config['first_names'] :
dict_geospatial_config['first_names'].remove( tupleEntry[-1] )
# language specific admin types
strCorpusFile = strCorpusDir + os.sep + 'corpus-admintype-' + strCode + '.txt'
if os.path.isfile(strCorpusFile) :
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( 'loading admin types from ' + strCorpusFile )
listTypeInfo = read_location_type_corpus( strCorpusFile, dict_geospatial_config )
dictAdminTypes['title'].extend( listTypeInfo[0] )
dictAdminTypes['type'].extend( listTypeInfo[1] )
#
# remove from name set all location title prefixes
# these will otherwise be assumed to be part of a name (e.g. west = title prefix, west = boys name, west london is OK but joe london is not)
#
for listTitles in listTypeInfo[0] :
for tupleEntry in listTitles :
if tupleEntry[0] != 'suffix' :
if tupleEntry[-1] in dict_geospatial_config['first_names'] :
dict_geospatial_config['first_names'].remove( tupleEntry[-1] )
dict_geospatial_config['building_types'] = dictBuildingTypes
dict_geospatial_config['street_types'] = dictStreetTypes
dict_geospatial_config['admin_types'] = dictAdminTypes
# load any available gazeteer sources
dictGaz = {}
for strCode in dictArgs['lang_codes'] :
strGazFile = strCorpusDir + os.sep + 'gazeteer-' + strCode + '.txt'
if os.path.isfile(strGazFile) :
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( 'loading gazeteer from ' + strGazFile )
readHandle = codecs.open( strGazFile, 'r', 'utf-8' )
listGaz = []
for line in readHandle :
# remove newline at end (might not have one if last line)
line = line.rstrip('\n')
line = line.rstrip('\r')
# remove UTF8 byte-order mark at start of file (added for UTF encoded files to indicate if its UTF-8, UTF-16 etc)
line = line.lstrip( '\ufeff' )
# create a list of [OSMID, OSMType, alt_name, ...] for later token expansion
if (len(line) > 1) and (not line.startswith('#')) :
listGazEntry = line.split(',')
for nIndexGax in range( len(listGazEntry) ) :
listGazEntry[nIndexGax] = listGazEntry[nIndexGax].strip()
if len(listGazEntry) < 3 :
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( 'gaz file ' + strGazFile + ' has invalid OSMID entry with < 3 tokens (ignored - expected OSMID, OSMType, alt_name, ...)' )
listGazEntry[0] = int( listGazEntry[0] )
if (listGazEntry[1] != 'way') and (listGazEntry[1] != 'node') and (listGazEntry[1] != 'relation') :
dict_geospatial_config['logger'].info( 'gaz file ' + strGazFile + ' has invalid OSMType entry - expected relation | way | node for ' + listGazEntry[1] )
else :
listGaz.append( listGazEntry )
dictGaz[ strGazFile ] = listGaz
dict_geospatial_config['gazeteers'] = dictGaz
# all done
return dict_geospatial_config
[docs]def is_good_place_name( phrase, dict_osm_tags, dict_geospatial_config ) :
"""
check if a phrase is a good placename (building, street, admin region etc.) for use in text matching. the OSM database contains some building names that are really house numbers (e.g. 50) and a few basic mistakes which need to be pruned to avoid poor quality matches.
rejects short names, only numbers, only stoplist names.
accepts short highway names e.g. 'M3' and multi-token admin regions.
:param unicode phrase: OSM location phrase to check if it makes a good place name
:param dict dict_osm_tags: OSM tags for this location
:param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config()
:return: True if this is a good location name, False if it should be rejected for token matching
:rtype: bool
"""
# check args without defaults
if not isinstance( phrase, str ) and not isinstance( phrase, str ):
raise Exception( 'invalid phrase' )
if not isinstance( dict_osm_tags, dict ) :
raise Exception( 'invalid dict_osm_tags' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# if in whitelist accept name is ok (check first so anything that matches is OK)
if phrase in dict_geospatial_config['whitelist'] :
return True
# must always have more than 1 letter in phrase
if len( phrase ) <= 1 :
return False
# if in blacklist name is not ok
if phrase in dict_geospatial_config['blacklist'] :
return False
# get tokens
listTokens = soton_corenlppy.common_parse_lib.tokenize_sentence( phrase, dict_geospatial_config )
# reject phrases that are simply too small (except road names which might have M3 type abbreviations or official admin areas like USA)
if (not 'highway' in dict_osm_tags) and (not 'admin level' in dict_osm_tags) :
if len( phrase ) <= 3 :
return False
# reject 1g phrases unless they are roads (e.g. M3) or admin regions (e.g. London)
# and reject all stop word phrases unless its a 2gram+ region name (as region names can be full of stop words)
if ('admin level' in dict_osm_tags) or ('place' in dict_osm_tags) or ('is in' in dict_osm_tags) :
# admin => allow all 2g+ and 1g phrases IF they are not stop words
if len(listTokens) == 1 :
if soton_corenlppy.common_parse_lib.is_all_stoplist( listTokens, dict_geospatial_config ) :
return False
elif ('highway' in dict_osm_tags) :
# roads => allow any IF they are not stop words
if soton_corenlppy.common_parse_lib.is_all_stoplist( listTokens, dict_geospatial_config ) :
return False
else :
# other => reject 1g phrases and check 2g+ is not only stop words
# since OpenStreetMap has a lot of rubbish 1g phrases like 'station', '24', 'building' ...
if len(listTokens) == 1 :
return False
if soton_corenlppy.common_parse_lib.is_all_stoplist( listTokens, dict_geospatial_config ) :
return False
# reject phrases with only numbers
# e.g. '24' (such as flat number 24)
bValid = False
for strToken in listTokens :
if strToken.isdigit() == False :
bValid = True
if bValid == False :
return False
# lookup single tokens in wordnet dictionary
# use all known languages for stoplist as we do not know the language these will be matched against in advance
if dict_geospatial_config['use_wordnet'] == True :
if len(listTokens) == 1 :
# for strLangISO639_2 in dict_geospatial_config['lang_codes_ISO639_2'] :
for strLangISO639_2 in wordnet.langs() :
# if location name is also a non-noun word its a bad name
listSyn = wordnet.synsets( phrase, pos='asrv', lang=strLangISO639_2 )
if len(listSyn) > 0 :
#dict_geospatial_config['logger'].info( 'WORDNET REJECT ' + phrase + ' : ' + strLangISO639_2 )
return False
# all done
return True
[docs]def expand_hashtag( phrase, dict_geospatial_config ) :
"""
return a hashtag for a phrase (expects clean phrase text)
:param unicode phrase: OSM location phrase to check if it makes a good place name
:param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config()
:return: hashtag text
:rtype: unicode
"""
# check args without defaults
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
if len(phrase) == 0 :
return ''
strHashtag = copy.deepcopy( phrase )
# remove spaces to make a hashtag word (e.g. newyork)
strHashtag = strHashtag.replace( ' ','' )
if len(strHashtag) == 0 :
return ''
return '#' + strHashtag
[docs]def expand_osm_alternate_names( tuple_osmid, phrase, dict_osm_tags, dict_geospatial_config ) :
"""
return a list of location names expanded to include OSM ref, alt, language variants, street and building type variants etc. for example 'London St' will generate ['London Street', 'London St'].
:param tuple tuple_osmid: tuple of OSM IDs that represent this location. locations such as roads can have multiple OSM IDs which represent different ways along the length of the road.
:param unicode phrase: cleaned name (not stemmed) of OSM location which should be expanded
:param dict dict_osm_tags: OSM tags for this location
:param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config()
:return: list of name variants for this location phrase (including the original phrase itself)
:rtype: list
"""
# check args without defaults
if not isinstance( tuple_osmid, tuple ) :
raise Exception( 'invalid tuple_osmid' )
if not isinstance( dict_osm_tags, dict ) :
raise Exception( 'invalid dict_osm_tags' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# always keep the initial phrase
if is_good_place_name( phrase, dict_osm_tags, dict_geospatial_config ) :
listResult = [phrase]
else :
listResult = []
# compile a list of tags to check
# note: 'alt_name:en' actually appears in database as 'alt name:en'
listTagsToCheck = [ 'name', 'ref', 'loc ref', 'nat ref', 'old ref', 'reg ref', 'ISO3166-1', 'ISO3166-1:alpha2', 'ISO3166-1:alpha3' ]
listTagsToCheck.extend( [ 'alt name', 'alt name:1', 'alt name:2', 'int name', 'loc name', 'nat name', 'old name', 'reg name', 'short name', 'name:abbreviation', 'name:simple', 'sorting name' ] )
for strLangCode in dict_geospatial_config['lang_codes'] :
listTagsToCheck.extend( [ 'name:' + strLangCode, 'alt name:' + strLangCode, 'old name:' + strLangCode ] )
# check for OSM reference tags and add them (avoid duplicates)
for strTag in listTagsToCheck :
if strTag in dict_osm_tags :
# get name
strPhraseTag = dict_osm_tags[ strTag ]
# remove (...) in cases like 'Montreal (06)' that do exist alas
if '(' in strPhraseTag :
strPhraseTag = strPhraseTag[ : strPhraseTag.index('(') ]
# clean name
strPhraseTag = soton_corenlppy.common_parse_lib.clean_text( strPhraseTag, dict_geospatial_config )
# check its a good name
if (len(strPhraseTag) > 0) and (not strPhraseTag in listResult) :
if is_good_place_name( strPhraseTag, dict_osm_tags, dict_geospatial_config ) == True :
listResult.append( strPhraseTag )
# expand building names
# also use OSM types if possible to add a type to end of building so it triggers expansion
bBuilding = False
if 'railway' in dict_osm_tags :
if dict_osm_tags['railway'] == 'station' :
bBuilding = True
if 'en' in dict_geospatial_config['lang_codes'] :
if not ' station' in phrase :
listResult.append( phrase + ' station' )
if 'station' in dict_osm_tags :
if dict_osm_tags['station'] == 'subway' :
if not 'subway' in phrase :
listResult.append( phrase + ' subway' )
if not 'metro' in phrase :
listResult.append( phrase + ' metro' )
if not 'underground' in phrase :
listResult.append( phrase + ' underground' )
elif 'amenity' in dict_osm_tags :
# for buildings use its (english) type as a suffix for token expansion
# e.g. Le Batacan = Le Batacan Threatre
bBuilding = True
if 'en' in dict_geospatial_config['lang_codes'] :
strType = dict_osm_tags['amenity'].replace('_',' ')
if (strType != 'yes') and (strType != 'user defined') :
if not ' ' + strType in phrase :
listResult.append( phrase + ' ' + strType )
elif 'building' in dict_osm_tags :
bBuilding = True
if 'en' in dict_geospatial_config['lang_codes'] :
strType = dict_osm_tags['building'].replace('_',' ')
if (strType != 'yes') and (strType != 'user defined') :
if not ' ' + strType in phrase :
listResult.append( phrase + ' ' + strType )
if bBuilding == True :
if dict_geospatial_config['building_types'] != None :
listResult = expand_osm_location_types( listResult, dict_geospatial_config['building_types'], dict_osm_tags, dict_geospatial_config )
# expand street names (dont allow buildings)
if bBuilding == False :
if 'highway' in dict_osm_tags :
if dict_geospatial_config['street_types'] != None :
listResult = expand_osm_location_types( listResult, dict_geospatial_config['street_types'], dict_osm_tags, dict_geospatial_config )
# expand admin region names (dont allow buildings)
if bBuilding == False :
if ('admin level' in dict_osm_tags) or ('place' in dict_osm_tags) :
if dict_geospatial_config['admin_types'] != None :
listResult = expand_osm_location_types( listResult, dict_geospatial_config['admin_types'], dict_osm_tags, dict_geospatial_config )
# expand gazeteer entries for this OSMID (if any)
if dict_geospatial_config['gazeteers'] != None :
for strGaz in dict_geospatial_config['gazeteers'] :
for nIndexGaz in range( len(dict_geospatial_config['gazeteers'][strGaz]) ) :
# in OSM relations have a negative ID, nodes and way positive
nID = dict_geospatial_config['gazeteers'][strGaz][nIndexGaz][0]
strType = dict_geospatial_config['gazeteers'][strGaz][nIndexGaz][1]
if strType == 'relation' :
nID = -1 * nID
if nID in tuple_osmid :
for strPhrase in dict_geospatial_config['gazeteers'][strGaz][nIndexGaz][2:] :
strPhraseClean = soton_corenlppy.common_parse_lib.clean_text( strPhrase, dict_geospatial_config )
if not strPhraseClean in listResult :
listResult.append( strPhraseClean )
# add hashtag versions of all expanded phrases
for nIndex in range(len(listResult)) :
strHashtag = expand_hashtag( listResult[nIndex], dict_geospatial_config )
if len(strHashtag) > 0 :
listResult.append( strHashtag )
# return what we have
return listResult
[docs]def calc_OSM_type( dict_osm_tags, dict_geospatial_config ) :
"""
calc the OSM tags to work out the type of OSM location. this is especialy useful for high level filtering and visualization as OSM tags are quite detailed
:param dict dict_osm_tags: OSM tags for this location
:param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config()
:return: transport | building | admin | other
:rtype: str
"""
# check args without defaults
if not isinstance( dict_osm_tags, dict ) :
raise Exception( 'invalid dict_osm_tags' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# note: all _ are replaced with ' ' before dictOSMTags is computed
if 'admin level' in dict_osm_tags :
return 'admin'
if 'place' in dict_osm_tags :
return 'admin'
if 'railway' in dict_osm_tags :
return 'transport'
if 'highway' in dict_osm_tags :
return 'transport'
if 'aerialway' in dict_osm_tags :
return 'transport'
if 'public transport' in dict_osm_tags :
return 'transport'
if 'route' in dict_osm_tags :
return 'transport'
if 'amenity' in dict_osm_tags :
# check for transport amenities
strValue = dict_osm_tags['amenity']
if strValue == 'bus station' :
return 'transport'
if strValue == 'ev charging' :
return 'transport'
if strValue == 'charging station' :
return 'transport'
if strValue == 'ferry terminal' :
return 'transport'
if strValue == 'fuel' :
return 'transport'
if strValue == 'parking' :
return 'transport'
if strValue == 'parking entrance' :
return 'transport'
if strValue == 'parking space' :
return 'transport'
if strValue == 'taxi' :
return 'transport'
# default amenity type is a building
return 'building'
if 'building' in dict_osm_tags :
# check for transport buildings
strValue = dict_osm_tags['building']
if strValue == 'train station' :
return 'transport'
if strValue == 'transportation' :
return 'transport'
if strValue == 'hanger' :
return 'transport'
# default building is a building!
return 'building'
if 'landuse' in dict_osm_tags :
# check for transport uses
strValue = dict_osm_tags['landuse']
if strValue == 'railway' :
return 'transport'
if 'man made' in dict_osm_tags :
# check for man made buildings
strValue = dict_osm_tags['man made']
if strValue == 'bunker silo' :
return 'building'
if strValue == 'lighthouse' :
return 'building'
if strValue == 'silo' :
return 'building'
if strValue == 'wastewater plant' :
return 'building'
if strValue == 'watermill' :
return 'building'
if strValue == 'water tower' :
return 'building'
if strValue == 'water works' :
return 'building'
if strValue == 'windmill' :
return 'building'
if strValue == 'works' :
return 'building'
if 'military' in dict_osm_tags :
# check for miltary buildings
strValue = dict_osm_tags['military']
if strValue == 'airfield' :
return 'transport'
if strValue == 'bunker' :
return 'building'
if strValue == 'barracks' :
return 'building'
if strValue == 'naval base' :
return 'building'
if 'shop' in dict_osm_tags :
return 'building'
if 'tourism' in dict_osm_tags :
# check for tourist buildings
strValue = dict_osm_tags['tourism']
if strValue == 'alpine hut' :
return 'building'
if strValue == 'chalet' :
return 'building'
if strValue == 'gallery' :
return 'building'
if strValue == 'guest house' :
return 'building'
if strValue == 'hostel' :
return 'building'
if strValue == 'hotel' :
return 'building'
if strValue == 'motel' :
return 'building'
if strValue == 'museum' :
return 'building'
if strValue == 'theme park' :
return 'building'
if strValue == 'wilderness hut' :
return 'building'
if strValue == 'zoo' :
return 'building'
# anything else return other
return 'other'
[docs]def calc_OSM_linkedgeodata_uri( tuple_osmid, geom ) :
"""
return a linkedgeodata URI for this OSMID (first ID in tuple only)
:param tuple tuple_osmid: tuple of OSM IDs that represent this location. locations such as roads can have multiple OSM IDs which represent different ways along the length of the road.
:param str geom: serialized OpenGIS geometry object e.g. 'POINT( lon lat )'
:return: URI to linkedgeodata for first OSM ID in tuple
:rtype: str
"""
if (not isinstance( tuple_osmid, tuple )) or (len(tuple_osmid) == 0) :
raise Exception( 'invalid tuple_osmid' )
if (not isinstance( geom, str )) and (not isinstance( geom, str )) :
raise Exception( 'invalid geom' )
# we will only provide a URI for the first OSMID if the location is a complex one
nOSMIDFirst = tuple_osmid[0]
# relations have a negative number
if nOSMIDFirst < 0 :
return "http://linkedgeodata.org/page/triplify/relation" + str( -1 * nOSMIDFirst )
# nodes are points
if geom.lower().startswith( 'point(' ) :
return "http://linkedgeodata.org/page/triplify/node" + str( nOSMIDFirst )
# otherwise assume its a way
return "http://linkedgeodata.org/page/triplify/way" + str( nOSMIDFirst )
[docs]def calc_OSM_uri( tuple_osmid, geom ) :
"""
return a openstreetmap URI for this OSMID (first ID in tuple only)
:param tuple tuple_osmid: tuple of OSM IDs that represent this location. locations such as roads can have multiple OSM IDs which represent different ways along the length of the road.
:param str geom: serialized OpenGIS geometry object e.g. 'POINT( lon lat )'
:return: URI to linkedgeodata for first OSM ID in tuple
:rtype: str
"""
if (not isinstance( tuple_osmid, tuple )) or (len(tuple_osmid) == 0) :
raise Exception( 'invalid tuple_osmid' )
if (not isinstance( geom, str )) and (not isinstance( geom, str )) :
raise Exception( 'invalid geom' )
# we will only provide a URI for the first OSMID if the location is a complex one
nOSMIDFirst = tuple_osmid[0]
# relations have a negative number
if nOSMIDFirst < 0 :
return "http://www.openstreetmap.org/relation/" + str( -1 * nOSMIDFirst )
# nodes are points
if geom.lower().startswith( 'point(' ) :
return "http://www.openstreetmap.org/node/" + str( nOSMIDFirst )
# otherwise assume its a way
return "http://www.openstreetmap.org/way/" + str( nOSMIDFirst )
[docs]def expand_osm_location_types( list_location_names, dict_location_types, dict_osm_tags, dict_geospatial_config ) :
"""
given an set of location names return an expanded list with all known location type name variants. the original location name will always appear in the variant list.
e.g. ['london st'] -> [[ 'london st', 'london street' ]]
:param list list_location_names: list of clean location phrase variants for this location (e.g. full name, short name and abbreviation)
:param dict dict_location_types: dict of types prefixes and type variants in form { 'title' : listTypePattern, 'type' : listTypePattern }. listTypePattern is generated using read_location_type_corpus()
:param dict dict_osm_tags: OSM tags for this location
:param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config()
:return: expanded list of location phrase variants
:rtype: list
"""
# dictLocationTypes = { 'title' : listTitleSet, 'type' : listTypeSet }
# listTitleSet = listTypeSet = [ [ ('prefix' | 'suffix' | 'both', token, token ... ) ... N_phrase_variants ], ... N_types ]
# phrase = (token1, token2, ...)
# e.g. [ [ ('suffix','clinic'), ('suffix','day','centre') ], [ ('both','uni'),('both','university') ] ]
# check args without defaults
if not isinstance( list_location_names, list ) :
raise Exception( 'invalid list_location_names' )
if not isinstance( dict_location_types, dict ) :
raise Exception( 'invalid dict_location_types' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# always keep the initial list of variant names
listResult = copy.deepcopy( list_location_names )
# loop on location name variants
for nIndexPhrase in range(len(list_location_names)) :
# get phrase
strPhrase = list_location_names[nIndexPhrase]
# note: phrase is expected to be clean and spaces delimiting words
# note: convert to tuple as we will be matching it to type phrases (which are tuples) and list != tuple
tupleTokens = tuple( soton_corenlppy.common_parse_lib.tokenize_sentence( strPhrase, dict_geospatial_config ) )
# remove first found title prefix or suffix from location name
# note: types are sorted by gram order so bigrams will match before unigrams
# note: this has a weakness for 'the big building org' will match title 'the' ==> not 'org' as well
tupleTokensStripped = tupleTokens
tupleTitlePrefix = ()
tupleTitleSuffix = ()
#
# check for known place titles (take first match)
#
for tupleStopTokens in dict_location_types['title'] :
if len(tupleTokens) >= len(tupleStopTokens)-1 :
# check stop words in prefix
if tupleStopTokens[0] == 'prefix' or tupleStopTokens[0] == 'both' :
if tupleStopTokens[1:] == tupleTokens[:len(tupleStopTokens)-1] :
tupleTitlePrefix = tupleStopTokens[1:]
tupleTokensStripped = tupleTokens[len(tupleStopTokens)-1:]
break
# check stop words in suffix
if tupleStopTokens[0] == 'suffix' or tupleStopTokens[0] == 'both' :
if tupleStopTokens[1:] == tupleTokens[-1*(len(tupleStopTokens)-1):] :
tupleTitleSuffix = tupleStopTokens[1:]
tupleTokensStripped = tupleTokens[:-1*(len(tupleStopTokens)-1)]
break
# do not allow location names where the title is the whole name
# such location names are not informative enough to be useful for accurate matching
# e.g. location = 'north london' ==> expand('london', 'north london')
# e.g. location = 'north' ==> reject
if len(tupleTokensStripped) == 0 :
# delete this location name entirely as it is a
# match to a location type name and not informative enough
# note: listResult has all the listVariants tokens so we just void the bad one
listResult[nIndexPhrase] = ''
continue
# add the phrase stripped of any title words to the variant list (even if it does not contain a type)
strStrippedPhrase = ' '.join( tupleTokensStripped )
if not strStrippedPhrase in listResult :
if is_good_place_name( strStrippedPhrase, dict_osm_tags, dict_geospatial_config ) == True :
listResult.append( strStrippedPhrase )
# loop on each location type and take first match found
# note: types are sorted by gram order so bigrams will match before unigrams
tupleTokensNoType = ()
listMatchedLocationType = []
bMatched = False
for listLocationType in dict_location_types[ 'type' ] :
# listLocationType = [ [ ('suffix','t1.1','t1.2'), ('both','t2.1','t2.2', ... ) ] ]
# ignore types with the * no_match prefix
# e.g. '*high school' --> ('suffix', '*high', 'school')
for tuplePhrase in listLocationType :
if len(tuplePhrase)-1 <= len(tupleTokensStripped) :
if not tuplePhrase[1].startswith( '*' ) :
if tuplePhrase[0] == 'prefix' or tuplePhrase[0] == 'both' :
if tuplePhrase[1:] == tupleTokensStripped[:len(tuplePhrase)-1] :
# strip the matched location type from the location name
tupleTokensNoType = tupleTokensStripped[len(tuplePhrase)-1:]
listMatchedLocationType = listLocationType
bMatched = True
break
if tuplePhrase[0] == 'suffix' or tuplePhrase[0] == 'both' :
if tuplePhrase[1:] == tupleTokensStripped[-1*(len(tuplePhrase)-1):] :
# strip the matched location type from the location name
tupleTokensNoType = tupleTokensStripped[:-1*(len(tuplePhrase)-1)]
listMatchedLocationType = listLocationType
bMatched = True
break
if bMatched == True :
break
# if no match the original phrase is used as it has no known type to expand
if bMatched == False :
continue
# do not allow location names where the type is the whole name
# such location names are not informative enough to be useful for accurate matching
# e.g. location = 'nuffield hospital' ==> expand('hospital)
# e.g. location = 'hospital' ==> reject
if len(tupleTokensNoType) == 0 :
# delete this location name entirely as it is a
# match to a location type name and not informative enough
# note: listResult has all the listVariants tokens so we just void the bad one
listResult[nIndexPhrase] = ''
# generate all variants of the location name using the location type variant list
elif len(tupleTokensNoType) > 0 :
for tuplePhrase in listMatchedLocationType :
if tuplePhrase[0] == 'prefix' or tuplePhrase[0] == 'both' :
# note: use a list as tuples are fixed in size
listNewPhrase = list( tuplePhrase[1:] )
listNewPhrase.extend( tupleTokensNoType )
# add new phrase without any titles (if not already present)
strNewPhrase = ' '.join( listNewPhrase ).replace('*','')
if not strNewPhrase in listResult :
if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True :
listResult.append( strNewPhrase )
# add new phrase with title prefix (if not already present)
if len(tupleTitlePrefix) > 0 :
listNewPhraseTitle = copy.deepcopy( tupleTitlePrefix )
listNewPhraseTitle.extend( listNewPhrase )
strNewPhrase = ' '.join( listNewPhraseTitle ).replace('*','')
if not strNewPhrase in listResult :
if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True :
listResult.append( strNewPhrase )
# add new phrase with title suffix (if not already present)
if len(tupleTitleSuffix) > 0 :
listNewPhraseTitle = copy.deepcopy( listNewPhrase )
listNewPhraseTitle.extend( tupleTitleSuffix )
strNewPhrase = ' '.join( listNewPhraseTitle ).replace('*','')
if not strNewPhrase in listResult :
if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True :
listResult.append( strNewPhrase )
if tuplePhrase[0] == 'suffix' or tuplePhrase[0] == 'both' :
# note: use a list as tuples are fixed in size
listNewPhrase = list( tupleTokensNoType )
listNewPhrase.extend( tuplePhrase[1:] )
# add new phrase without any titles (if not already present)
strNewPhrase = ' '.join( listNewPhrase ).replace('*','')
if not strNewPhrase in listResult :
if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True :
listResult.append( strNewPhrase )
# add new phrase with title prefix (if not already present)
if len(tupleTitlePrefix) > 0 :
listNewPhraseTitle = copy.deepcopy( tupleTitlePrefix )
listNewPhraseTitle.extend( listNewPhrase )
strNewPhrase = ' '.join( listNewPhraseTitle ).replace('*','')
if not strNewPhrase in listResult :
if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True :
listResult.append( strNewPhrase )
# add new phrase with title suffix (if not already present)
if len(tupleTitleSuffix) > 0 :
listNewPhraseTitle = copy.deepcopy( listNewPhrase )
listNewPhraseTitle.extend( tupleTitleSuffix )
strNewPhrase = ' '.join( listNewPhraseTitle ).replace('*','')
if not strNewPhrase in listResult :
if is_good_place_name( strNewPhrase, dict_osm_tags, dict_geospatial_config ) == True :
listResult.append( strNewPhrase )
# remove any empty names
while listResult.count('') :
listResult.remove('')
# return what we have after expansion
return listResult
[docs]def read_location_type_corpus( filename, dict_geospatial_config ) :
"""
read a location type corpus containing information for location prefix variants (e.g. north london) and location type name variants (e.g. london street, london st)
| note : variants are encoded as a list [ [ ('prefix' | 'suffix' | 'both', token, token ... ) ... ], ... ]
| e.g. [ [ ('suffix','clinic'), ('suffix','day','centre') ], [ ('both','uni'),('both','university') ] ]
| corpus file syntax :
| title, ... for location title words not part of the type (e.g. north)
| type, ... for location type types (e.g. hospital)
| \# for comments
| +<phrase> = prefix to location name
| <phrase>+ = suffix to location name
| +<phrase>+ = can be both prefix and suffix
| tokens starting with a '*' indicate that the phrase is not to be used for initial match, but will used for expansion variants
| e.g. primary, *school --> matches only '<name> primary' BUT will expand to '<name> primary', '<name> school' since there are other types of school that could match eroneously
:param str filename: filename of location type corpus
:param dict dict_geospatial_config: config object returned from geoparse_lib.get_geoparse_config()
:return: list of prefix variants and type variants
:rtype: list
"""
# check args without defaults
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( 'loading location type corpus ' + filename )
listTitleSet = []
listTypeSet = []
# make a custom whitespace allowing '*' which will be the no match flag
strUserWhitespace = copy.deepcopy( dict_geospatial_config['whitespace'] )
strUserWhitespace = strUserWhitespace.replace( '*', '' )
# open file
readHandle = codecs.open( filename, 'r', 'utf-8' )
# read each line in document and process it
nLineCount = 0
for line in readHandle :
nLineCount = nLineCount + 1
# remove newline at end (might not have one if last line)
line = line.rstrip( '\n' )
line = line.rstrip( '\r' )
# remove UTF8 byte-order mark at start of file (added for UTF encoded files to indicate if its UTF-8, UTF-16 etc)
line = line.lstrip( '\ufeff' )
if (len(line) > 1) and (not line.startswith('#')) :
# tokenize using comma delimiter
# and make lowercase
listEntries = line.lower().split(',')
if len( listEntries ) > 0 :
bType = False
bTitle = False
if listEntries[0].strip() == 'title' :
bTitle = True
elif listEntries[0].strip() == 'type' :
bType = True
else :
raise Exception( 'row found without a title | type prefix : ' + str(filename) + ' : line ' + str(nLineCount) )
#
# create entry by parsing special tokens (comma delimited phrase set)
#
listEntryVariants = []
for strPhrase in listEntries[1:] :
# parse special characters
strPhraseStripped = copy.deepcopy( strPhrase.strip() )
bNoMatch = False
bPrefix = False
bSuffix = False
if strPhraseStripped.startswith('*') :
strPhraseStripped = strPhraseStripped[1:]
bNoMatch = True
if strPhraseStripped.startswith('+') :
strPhraseStripped = strPhraseStripped[1:]
bPrefix = True
if strPhraseStripped.endswith('+') :
strPhraseStripped = strPhraseStripped[:-1]
bSuffix = True
# clean
strTextClean = soton_corenlppy.common_parse_lib.clean_text( strPhraseStripped, dict_geospatial_config )
# tokenize phrase
listTokens = soton_corenlppy.common_parse_lib.tokenize_sentence( strTextClean, dict_geospatial_config )
if len(listTokens) > 0 :
# keep * in front of 1st token of phrases we dont want to match
if bNoMatch == True :
listTokens[0] = '*' + listTokens[0]
# apply prefix|suffix|both labelling
if bPrefix and bSuffix :
listTokens.insert( 0,'both' )
elif bSuffix :
listTokens.insert( 0,'suffix' )
elif bPrefix :
listTokens.insert( 0,'prefix' )
else :
raise Exception( 'phrase found without a prefix/suffix indicator : ' + str(filename) + ' : line ' + str(nLineCount) )
# add tokens to list
tuplePhrase = tuple( listTokens )
if not tuplePhrase in listEntryVariants :
listEntryVariants.append( tuplePhrase )
if bTitle == True :
listTitleSet.append( listEntryVariants )
if bType == True :
listTypeSet.append( listEntryVariants )
# all done
readHandle.close()
if dict_geospatial_config['logger'] != None :
dict_geospatial_config['logger'].info( '- ' + str(len(listTitleSet)) + ' unique titles' )
dict_geospatial_config['logger'].info( '- ' + str(len(listTypeSet)) + ' unique types' )
# sort all token sets so the tokens appear in gram order
# ensuring highest gram gets matched first
for nIndex in range(len(listTitleSet)) :
listTitleSet[nIndex] = sorted( listTitleSet[nIndex], key=len, reverse=True )
for nIndex in range(len(listTypeSet)) :
listTypeSet[nIndex] = sorted( listTypeSet[nIndex], key=len, reverse=True )
# debug (pretty print of place names)
"""
if dict_geospatial_config['logger'] != None :
for listEntry in listTitleSet :
dict_geospatial_config['logger'].info( 'TITLE = ' + repr(listEntry) )
for listEntry in listTypeSet :
dict_geospatial_config['logger'].info( 'TYPE = ' + repr(listEntry) )
"""
# all done
return [listTitleSet,listTypeSet]
[docs]def calc_geom_index( list_data, index_geom = 4, index_id = 2, index_osm_tag = 5 ) :
"""
compile an index of shapely geoms from a list of data where one column is a geom. there can be several geom for each osmid as island groups can have a geom per island, but still have a single osmid and name (e.g. Shetland, UK).
the key for this index will either be the original data list row number OR the value from an ID column if provided.
a OSM tag column can optionally be provided to append OSM tag data to the end of the geom index entry, which can be useful for determining the location type each geom refers to (e.g. admin region, road)
:param list list_data: list of data rows where one of the column contains a serialized OpenGIS geom
:param int index_geom: column index in list of data for geom
:param int index_id: column index in list of data for id (can be None)
:param int index_osm_tag: column index in list of data for OSM tag dict (can be None)
:return: dict of { id : [ ( shapely_geom_prepared, shapely_envelope, shapely_geom, shapely_representative_point, dict_OSM_tag, geom_serialized ), ...] }
:rtype: dict
"""
# check args without defaults
if not isinstance( list_data, list ) :
raise Exception( 'invalid list_data' )
if not isinstance( index_geom, int ) :
raise Exception( 'invalid index_geom' )
# anything to do ?
if len(list_data) == 0 :
return {}
# enable speedups if supported
# note: this is important as it enabled c compiled functions to replace py functions
# which can give an order of magnitude speedup
if shapely.speedups.available == True :
shapely.speedups.enable()
# check index OK
if len(list_data[0]) <= index_geom :
raise Exception( 'geom index >= len(source data)' )
if (index_id != None) and (len(list_data[0]) <= index_id) :
raise Exception( 'ID index >= len(source data)' )
if (index_osm_tag != None) and (len(list_data[0]) <= index_osm_tag) :
raise Exception( 'OSM tag index >= len(source data)' )
dictGeomIndex = {}
for nDocIndex in range(len(list_data)) :
# get source geom
strGeom = list_data[nDocIndex][index_geom]
if (not isinstance( strGeom, str )) and (not isinstance( strGeom, str )) :
raise Exception( 'object type of geom not str,unicode : ' + str(type(strGeom)) )
# get ID tuple
if index_id != None :
if isinstance( list_data[nDocIndex][index_id], list ) :
tupleID = tuple( list_data[nDocIndex][index_id] )
elif isinstance( list_data[nDocIndex][index_id], tuple ) :
tupleID = list_data[nDocIndex][index_id]
elif isinstance( list_data[nDocIndex][index_id], str ) :
tupleID = tuple( [list_data[nDocIndex][index_id]] )
elif isinstance( list_data[nDocIndex][index_id], str ) :
tupleID = tuple( [list_data[nDocIndex][index_id]] )
else :
raise Exception( 'object type of ID not list,tuple,str,unicode' )
else :
tupleID = (nDocIndex,)
# get OSM tag dict
if index_osm_tag != None :
dictOSMTag = list_data[nDocIndex][index_osm_tag]
if not isinstance( dictOSMTag, dict ) :
raise Exception( 'object type of OSM tag not dict' )
else :
dictOSMTag = {}
# add shape to dict if not already in index
# note: use a prepared object for more efficient batch processing later
bAdd = True
if not tupleID in dictGeomIndex :
dictGeomIndex[ tupleID ] = []
for entry in dictGeomIndex[ tupleID ] :
if entry[5] == strGeom :
bAdd = False
if bAdd == True :
shapeGeom = shapely.wkt.loads( strGeom )
shapeGeomSimple = shapeGeom.envelope
shapeGeomPrep = shapely.prepared.prep(shapeGeom)
shapeGeomSimplePrep = shapely.prepared.prep(shapeGeomSimple)
pointRepresentative = shapeGeom.representative_point()
dictGeomIndex[ tupleID ].append( (shapeGeomPrep, shapeGeomSimplePrep, shapeGeom, pointRepresentative, dictOSMTag, strGeom) )
# return geom index
return dictGeomIndex
[docs]def geoparse_token_set( token_set, dict_inverted_index, dict_geospatial_config ) :
"""
geoparse token sets using a set of cached locations. no location disambiguation is performed here so all possible location matches will be returned for each token
:param list token_set: list of tokens to geoparse
:param dict dict_inverted_index: inverted index of cached locations from soton_corenlppy.common_parse_lib.calc_inverted_index()
:param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config()
:return: list of location indexes for each matched phrase [ [ (token_start, token_end, set(doc_index1, doc_index2 ...), matched_tokens), ... ], ... ]
:rtype: list
"""
# check args without defaults
if not isinstance( token_set, list ) :
raise Exception( 'invalid token_set' )
if not isinstance( dict_inverted_index, dict ) :
raise Exception( 'invalid dict_inverted_index' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
listResultSet = []
listSentChars = dict_geospatial_config['sent_token_seps']
# get list of names we will not allow to be prefix to a location phrase
setNames = dict_geospatial_config['first_names']
# loop on all token sets to geoparse
for listTokens1g in token_set :
listResult = []
# create all combinations of ngram tokens from this 1gram token list
# note: listTokenAllg = [ [('one',),('two',),('three',),('four',)], [('one','two'), ('two','three'), ('three','four')], [('one','two','three'),('two','three','four')] ]
#listTokenAllg = soton_corenlppy.common_parse_lib.create_ngram_tokens( listTokens1g, dict_geospatial_config['max_gram'], dict_geospatial_config['sent_token_seps'] )
# DEBUG
#dict_geospatial_config['logger'].info( 'TOKEN_SET allg = ' + repr(listTokenAllg) )
#dict_geospatial_config['logger'].info( 'TOKEN_SET 1g = ' + repr(listTokens1g) )
# try to match all phrases in the gram range requested
# note: avoid using soton_corenlppy.common_parse_lib.create_ngram_tokens() as we will lose the token position in cases of newlines (as ngram phrases are designed to not span sent boundaries)
nMaxGram = dict_geospatial_config['max_gram']
for nTokenIndexStart in range(len(listTokens1g)) :
for nTokenIndexEnd in range( nTokenIndexStart+1, nTokenIndexStart+1 + nMaxGram ) :
if nTokenIndexEnd <= len(listTokens1g) :
# get phrase to match with
tuplePhrase = tuple( listTokens1g[ nTokenIndexStart:nTokenIndexEnd ] )
# reject any phrase that spans a sentence
bPhraseSpansSent = False
for strToken in tuplePhrase :
if strToken in listSentChars :
bPhraseSpansSent = True
if bPhraseSpansSent == True :
break
# check inverted index for a match (if none ignore phrase)
if tuplePhrase in dict_inverted_index :
setMatchIndex = dict_inverted_index[ tuplePhrase ]
# check to see if the previous token is a first name AND location name length is 1 token
# if so REJECT match as its part of a name
# e.g. this is victoria derbyshire = NO
# e.g. will new york city be great = YES
# e.g. is Chelsea Smith be going out tonight = YES (another error that is unavoidable as we have no context to decide if Chelsea is a person or location without POS tagging)
# e.g. will London be great = NO (error but unavoidable as there might be a Will London out there!)
bNameCheck = False
if (nTokenIndexStart - 1 >= 0) and (len(tuplePhrase) == 1) :
if listTokens1g[ nTokenIndexStart - 1 ] in setNames :
bNameCheck = True
#dict_geospatial_config['logger'].info( 'NAME REJECT = ' + repr(listTokens1g[ nTokenIndexStart - 1 ]) + ' : ' + repr(tuplePhrase) )
if bNameCheck == False :
listResult.append( (nTokenIndexStart, nTokenIndexEnd-1, setMatchIndex, tuplePhrase) )
# debug
'''
if ('bassett' in listTokens1g) :
dict_geospatial_config['logger'].info('GEO_MICRO1 = ' + repr(tuplePhrase) )
dict_geospatial_config['logger'].info('GEO_MICRO2 = ' + repr(bNameCheck) )
dict_geospatial_config['logger'].info('GEO_MICRO3 = ' + repr( (nTokenIndexStart,nTokenIndexEnd-1) ) )
'''
# debug
'''
if ('bassett' in listTokens1g) :
dict_geospatial_config['logger'].info('GEO_MICRO4 = ' + repr(listTokens1g) )
'''
listResultSet.append( listResult )
# all done
return listResultSet
[docs]def reverse_geocode_geom( list_geom, dict_geom_index, dict_geospatial_config ) :
"""
reverse geocode a list of OpenGIS geom objects and return all indexed geoms that intersect with each geom in this list
| note: if OSM tag info is specified in the dictGeomIndex then the geom with the highest admin level will be returned (i.e. most specific location returned, so road before suburb before city before country)
:param list list_geom: list of serialized OpenGIS geoms to geoparse
:param dict dict_geom_index: geom index from geo_parse_lib.calc_geom_index()
:param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config()
:return: a set for each geom checked, with the set containing the ids of any intersecting geoms e.g. [ set( tuple_osmid1, tuple_osmid2 ... ), ... ]
:rtype: list
"""
# check args without defaults
if not isinstance( list_geom, list ) :
raise Exception( 'invalid list_geom' )
if not isinstance( dict_geom_index, dict ) :
raise Exception( 'invalid dict_geom_index' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# init
listResult = []
for strGeom in list_geom :
# init
listEntry = []
nTopAdminLevel = 0
# if no geom no matches
if (strGeom != None) and (len(strGeom) > 0) :
# make the shape and do the slow shapely intersects() function calls
shapeGeom = shapely.wkt.loads( strGeom )
shapeGeomPrep = shapely.prepared.prep( shapeGeom )
# loop on all token sets to geoparse
# use prepared geometry shapeLoc to call intersect function as it is more efficient
# do a quick envelope test first as its orders of magnitude quicker than checking a complex polygon like Russia's border
for tupleID in dict_geom_index :
listGeoms = dict_geom_index[ tupleID ]
for tupleGeomData in listGeoms :
shapeLocPrep = tupleGeomData[0]
shapeLocSimplePrep = tupleGeomData[1]
shapeLoc = tupleGeomData[2]
dictOSMTag = tupleGeomData[4]
# optimization test results (ranked in order of speed - all very similar)
# (1) envelope_prep.intersects() then geom_prep.intersects()
# (2) geom_prep.intersects()
# (3) geom.within()
# (4) envelope_prep.contains() then geom_prep.contains() [intersects for polygons]
# (5) geom_prep.contains() [intersects for polygons]
# use intersects for polygon to polygon checks
# check simple polygon first as a simple spatial index check before using full polygon
bMatch = False
if shapeLocSimplePrep.intersects( shapeGeom ) == True :
if shapeLocPrep.intersects( shapeGeom ) == True :
bMatch = True
if bMatch == True :
if 'admin level' in dictOSMTag :
nAdminLevel = int( dictOSMTag['admin level'] )
else :
nAdminLevel = 100
# add to final result list unless we have seen this osmid before (with a different geom)
if not (tupleID,nAdminLevel) in listEntry :
listEntry.append( (tupleID,nAdminLevel) )
if nAdminLevel > nTopAdminLevel :
nTopAdminLevel = nAdminLevel
# only keep top OSM admin level results
# since we want the most specific location to be matched NOT the super region
# e.g. wall street > NY > USA
setResult = set()
for nIndex in range(len(listEntry)) :
if listEntry[nIndex][1] == nTopAdminLevel :
setResult.add( listEntry[nIndex][0] )
listResult.append( setResult )
# all done
return listResult
[docs]def create_matched_location_list( list_match, cached_locations, osmid_lookup ) :
"""
create a list of locations based on a set of matches and the original cached location table
:param list list_match: list of location matches from geo_parse_lib.geoparse_token_set()
:param list cached_locations: list of cached locations from geo_preprocess_lib.cache_preprocessed_locations()
:param dict osmid_lookup: lookup table mapping an osmid to a set of rows in the cached locations from geo_parse_lib.calc_osmid_lookup()
:return: list of matched locations with all of the geom information e.g. [[<loc_id>, <token_start>, <token_end>, loc_tokens, geom, (<osm_id>, ...), {<osm_tag>:<value>}*N_tags, (<osm_id_parent>, ...)] ...]
:rtype: list
"""
# check args without defaults
if not isinstance( list_match, list ) :
raise Exception( 'invalid list_match' )
if not isinstance( cached_locations, list ) :
raise Exception( 'invalid cached_locations' )
if not isinstance( osmid_lookup, dict ) :
raise Exception( 'invalid osmid_lookup' )
# create a list of annotated location matches with super region information useful for disambiguation
listLocMatches = []
for tupleMatch in list_match :
for tupleOSMIDs in tupleMatch[2] :
setIndexLoc = osmid_lookup[ tupleOSMIDs ]
for nIndexLoc in setIndexLoc :
listEntry = [
cached_locations[nIndexLoc][0],
tupleMatch[0],
tupleMatch[1],
tupleMatch[3],
cached_locations[nIndexLoc][4],
cached_locations[nIndexLoc][2],
cached_locations[nIndexLoc][5],
cached_locations[nIndexLoc][3]
]
listLocMatches.append( listEntry )
return listLocMatches
[docs]def calc_location_confidence( list_loc, dict_geospatial_config, index_token_start = 1, index_token_end = 2, index_osm_id = 5, index_osm_parents = 7, index_osm_tags = 6, semantic_distance = 6, index_geom = 4, geom_distance = 0.25, index_loc_tokens = 3, confidence_tests = (1,2,3,4), geom_context = None, geom_cache = {} ) :
"""
calculate confidence values for a set of location matches originating from a concatenated set of geo_parse_lib.geoparse_token_set() results
:param list list_loc: list of locations with geom information from geo_parse_lib.create_matched_location_list()
:param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config()
:param int index_token_start: index of matched token start position in list_loc
:param int index_token_end: index of matched token end position in list_loc
:param int index_osm_id: index of OSM ID in list_loc
:param int index_osm_parents: index of OSM ID of super regions to this matches location in list_loc
:param int index_osm_tags: index of OSM tags in list_loc
:param int semantic_distance: number of tokens (left and right) to look for semantically nearby location checks e.g. 'London in UK'
:param int index_geom: index of serialized OpenGIS geom in list_loc
:param int geom_distance: distance for shapely distance check (in degrees)
:param int index_loc_tokens: index of matched loc tokens
:param int confidence_tests: confidence check tests to run when calculating a confidence value. 1 = token subsumption, 2 = nearby parent region, 3 = nearby locations and nearby geotag, 4 = general before specific
:param dict geom_cache: cache of geom checks with distance and intersection results to avoid running the same shapely checks twice. this cache will be populated with any new geoms that are checked using shapely so might get large over time. e.g. dict{ strGeom : [ set(close_tuple_IDs), set(not_close_tuple_IDs), set(intersects_tuple_IDs), set(not_intersects_tuple_IDs) ] }
:return: a list of confidence values (0..300) for each location in list_loc. locations with a common token can be ranked by confidence and the highest value taken. a confidence of 0 means the location should be rejected regardless. closeby locations scores 2+. super regions present in text scores 10+. geocontext intersects location scores 200+ and closeness scores 100+
:rtype: list
"""
# check args without defaults
if not isinstance( list_loc, list ) :
raise Exception( 'invalid list_loc' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# checks
if semantic_distance < 1 :
raise Exception( 'semantic distance < 1' )
if len(list_loc) == 0 :
return []
# copy then append the index as we will sort it next and we dont want to lose the original order for result!
listLocs = copy.deepcopy( list_loc )
listResult = []
nOriginalIndex = len(listLocs[0])
for nIndexCount in range(len(listLocs)) :
# add master index to end of copy of the source list
listLocs[nIndexCount].append( nIndexCount )
# 0 confidence for subsumed tokens is default
listResult.append( 0 )
# debug
"""
bTEST = False
for nLocIndex1 in range(len(listLocs)) :
if u'scotland' in listLocs[nLocIndex1][3].lower() :
bTEST = True
if bTEST == True :
for nLocIndex1 in range(len(listLocs)) :
dict_geospatial_config['logger'].info('TEST0 = ' + str(listLocs[nLocIndex1][nIndexTokenStart]) + ':' + str(listLocs[nLocIndex1][nIndexTokenEnd]) + ' = ' + str(listLocs[nLocIndex1][3]) + ' = ' + str(listResult[ listLocs[nLocIndex1][nOriginalIndex] ]) )
"""
# sort list by 1st phrase size and 2nd token position and 3rd length of way (big polygons are probably better than single points)
# key = (1 + token end - token start) + 1.0/(2+token start) + 0.000001 * strGeom.count(',')
if (not isinstance( listLocs[0][index_geom], str )) and (not isinstance( listLocs[0][index_geom], str )) :
raise Exception( 'loc list geom index value is not of type <str> or <unicode>' )
listLocs = sorted( listLocs, key=lambda entry: (1 + entry[index_token_end] - entry[index_token_start]) + 1.0/(2+entry[index_token_start]) + 0.000001 * entry[index_geom].count(','), reverse=True )
# create geom index ONLY if its needed as shape work is slow
dictGeomIndex = None
# if we have a geom_context make it into a shape
shapeGeomContext = None
shapeGeomContextPoint = None
if geom_context != None :
shapeGeomContext = shapely.wkt.loads( geom_context )
shapeGeomContextPoint = shapeGeomContext.representative_point()
# phase 1 - find valid phrases and avoid subsumption
if 1 in confidence_tests :
for nLocIndex1 in range(len(listLocs)) :
# check index types for safety
if not isinstance( listLocs[nLocIndex1][index_token_start], int ) :
raise Exception( 'loc list token start index value is not of type <int>' )
if not isinstance( listLocs[nLocIndex1][index_token_end], int ) :
raise Exception( 'loc list token end index value is not of type <int>' )
if (not isinstance( listLocs[nLocIndex1][index_osm_id], tuple )) and (not isinstance( listLocs[nLocIndex1][index_osm_id], list )) :
raise Exception( 'loc list OSM ID array index value is not of type <tuple>' )
if (not isinstance( listLocs[nLocIndex1][index_osm_parents], tuple )) and (not isinstance( listLocs[nLocIndex1][index_osm_parents], list )) :
raise Exception( 'loc list parent OSM ID array index value is not of type <tuple>' )
# get token range of this location
nTokenStart = listLocs[nLocIndex1][index_token_start]
nTokenEnd = listLocs[nLocIndex1][index_token_end]
nGram = 1 + nTokenEnd - nTokenStart
# calc initial confidence
# reject subsumed tokens that have already been used for a higher gram phrase match
# note: allow tokens to overlap if same gram as these could be perfectly valid matches
# e.g. Russia, RU and Russia, Ohio, USA are 1 gram and will overlap (completely) => allow both to match with confidence 1
# i.e. subsumption test
# note: this works because we have already sorted the list by phrase gram size so we get larger ones first
bOverlap = False
for nLocIndex2 in range(nLocIndex1) :
# get token range of this previous location
nTokenStartPrevious = listLocs[nLocIndex2][index_token_start]
nTokenEndPrevious = listLocs[nLocIndex2][index_token_end]
nGramPrevious = 1 + nTokenEndPrevious - nTokenStartPrevious
if nGramPrevious > nGram :
if (nTokenEnd >= nTokenStartPrevious) and (nTokenStart <= nTokenEndPrevious) :
bOverlap = True
break
if bOverlap == False :
# 1 confidence for valid phrases (we will count parents in second phase)
listResult[ listLocs[nLocIndex1][nOriginalIndex] ] = 1
# debug
"""
for nLocIndex1 in range(len(listLocs)) :
dict_geospatial_config['logger'].info('TEST1 = ' + str(listLocs[nLocIndex1][1]) + ' : ' + str(listLocs[nLocIndex1][index_osm_id]) + ':' + str(listLocs[nLocIndex1][2]) + ' = ' + str(listLocs[nLocIndex1][3]) + ' = ' + str(listResult[ listLocs[nLocIndex1][nOriginalIndex] ]) )
"""
# phase 2 - parent check (worth 10)
# if OSMID admim parents specified check for them and add conf for each one nearby
# note: ignore geotags for this check as closeness to geotag is checked in phase 3
if 2 in confidence_tests :
for nLocIndex1 in range(len(listLocs)) :
nTokenStart = listLocs[nLocIndex1][index_token_start]
nTokenEnd = listLocs[nLocIndex1][index_token_end]
tupleOSMID1 = tuple( listLocs[nLocIndex1][index_osm_id] )
tupleLocPhrase1 = tuple( listLocs[nLocIndex1][index_loc_tokens] )
# dont check ourselves
setOSMIDChecked = set([tupleOSMID1])
# dict_geospatial_config['logger'].info( 'PARENT CHECK1 ' + repr(tupleOSMID1) )
if listResult[ listLocs[nLocIndex1][nOriginalIndex] ] > 0 :
nConf = listResult[ listLocs[nLocIndex1][nOriginalIndex] ]
for nLocIndex2 in range(len(listLocs)) :
nTokenStart2 = listLocs[nLocIndex2][index_token_start]
nTokenEnd2 = listLocs[nLocIndex2][index_token_end]
tupleOSMID2 = tuple( listLocs[nLocIndex2][index_osm_id] )
tupleLocPhrase2 = tuple( listLocs[nLocIndex2][index_loc_tokens] )
# do not allow use locs with the same name to act as parents
# e.g. island groups like shetlands OR admin polygon & admin_centre node with same name
if tupleLocPhrase1 == tupleLocPhrase2 :
continue
# dict_geospatial_config['logger'].info( 'PARENT CHECK2 ' + repr(tupleOSMID2) )
if (not tupleOSMID2 in setOSMIDChecked) and (listResult[ listLocs[nLocIndex2][nOriginalIndex] ] > 0):
# dont check same OSMID many times (e.g. if it has many geoms)
setOSMIDChecked.add( tupleOSMID2 )
# is this location within the semantic distance to check for parent? but not the same phrase
bCheck = False
if (nTokenStart2 > nTokenEnd) and (nTokenStart2 <= nTokenEnd + semantic_distance) :
bCheck = True
if (nTokenEnd2 < nTokenStart) and (nTokenEnd2 >= nTokenStart - semantic_distance) :
bCheck = True
# check all OSMID's of this possible parent against the declared parents in primary location
if bCheck == True :
setPrimaryOSMID = set( listLocs[nLocIndex1][index_osm_parents] )
setSecondaryOSMID = set( listLocs[nLocIndex2][index_osm_id] )
# if we have admin parents use them
if len( setPrimaryOSMID & setSecondaryOSMID ) > 0 :
nConf = nConf + 10
#dict_geospatial_config['logger'].info( 'PARENT CHECK osmid = ' + repr(tupleOSMID1) + ' conf = ' + str(nConf) )
listResult[ listLocs[nLocIndex1][nOriginalIndex] ] = nConf
# debug
"""
for nLocIndex1 in range(len(listLocs)) :
dict_geospatial_config['logger'].info('TEST2 = ' + str(listLocs[nLocIndex1][1]) + ' : ' + str(listLocs[nLocIndex1][index_osm_id]) + ':' + str(listLocs[nLocIndex1][2]) + ' = ' + str(listLocs[nLocIndex1][3]) + ' = ' + str(listResult[ listLocs[nLocIndex1][nOriginalIndex] ]) )
"""
# phase 3 - check inter-loc closeness (geocontext intersect worth 100, nearby location worth 1)
# check all loc geom
# if loc2 is within a semantic distance and != same name (i.e. try to avoid liverpool, liverpool)
# if loc2 is closeby ==> add confidence
# if loc2 is without a textual ref
# if loc2 is closeby OR intersects ==> add confidence
# note: shapely distance() is slow so we use only compare represetnative distance()
# which is fine for streets and buildings but inaccurate for large regions (e.g. Russia)
# so we need BOTH intersects and closeby tests for non-textual refs
# note: we do not have original phrase so can only check for common tag:name not actual text phrase
if 3 in confidence_tests :
# loop on all locs (1)
for nLocIndex1 in range(len(listLocs)) :
if listResult[ listLocs[nLocIndex1][nOriginalIndex] ] > 0 :
# get existing conf value
nConf = listResult[ listLocs[nLocIndex1][nOriginalIndex] ]
tupleOSMID1 = tuple( listLocs[nLocIndex1][index_osm_id] )
nTokenEnd1 = listLocs[nLocIndex1][index_token_end]
nTokenStart1 = listLocs[nLocIndex1][index_token_start]
tupleLocPhrase1 = tuple( listLocs[nLocIndex1][index_loc_tokens] )
strGeom1 = listLocs[nLocIndex1][index_geom]
# make sure this geom appears in cache
if not strGeom1 in geom_cache :
geom_cache[strGeom1] = [ set([]),set([]),set([]),set([]) ]
# check geom_context (if we have one) for both intersection and distance
# context_geom matches add 100 to confidence as this is a strong indication the loc is correct and should override others evidence
if shapeGeomContext != None :
if dictGeomIndex == None :
# index location geoms using the OSMID array as a unique identifier to avoid calculating duplicate geoms
# dictGeomIndex = dict <- calc_geom_index() = { tupleOSMID : [ ( shapely_geom_prepared, shapely_envelope, shapely_geom, shapely_representative_point, dict_OSM_tag, geom_serialized ), ... ] }
# dictGeomResultsCache = { tupleOSMID : [ set(close_tuple_IDs), set(not_close_tuple_IDs), set(intersects_tuple_IDs), set(not_intersects_tuple_IDs), set(serialized_geom) ] }
# note: only create it when its needed as its slow to do geom work
dictGeomIndex = calc_geom_index( listLocs, index_geom = index_geom, index_id = index_osm_id, index_osm_tag = None )
# get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups)
shapeLoc1 = None
shapePoint1 = None
for tupleGeomData in dictGeomIndex[tupleOSMID1] :
if strGeom1 == tupleGeomData[5] :
# use prepared geom for shapely match efficiency
shapeLoc1 = tupleGeomData[0]
# use representative point for distance() as using the full geom is much too slow
# for complex regions like russia (e.g. 1 min calc time which does not scale to 1000's of calls)
shapePoint1 = tupleGeomData[3]
break
if shapeLoc1 != None :
# calc intersections between two shapes. if so add to confidence
if (strGeom1.lower().startswith( 'polygon(' )) and (shapeLoc1.intersects( shapeGeomContext ) == True) :
nConf = nConf + 200
# calc distance between two shapes and check if it is within a given closeness. if so add to confidence
if shapePoint1.distance( shapeGeomContextPoint ) <= geom_distance :
nConf = nConf + 100
# loop on all locs (2)
for nLocIndex2 in range(len(listLocs)) :
if (nLocIndex2 != nLocIndex1) and (nTokenStart1 != nTokenStart2) and (nTokenEnd1 != nTokenEnd2) :
if listResult[ listLocs[nLocIndex2][nOriginalIndex] ] > 0 :
tupleOSMID2 = tuple( listLocs[nLocIndex2][index_osm_id] )
nTokenStart2 = listLocs[nLocIndex2][index_token_start]
nTokenEnd2 = listLocs[nLocIndex2][index_token_end]
tupleLocPhrase2 = tuple( listLocs[nLocIndex2][index_loc_tokens] )
strGeom2 = listLocs[nLocIndex2][index_geom]
# do not add confidence for spatially close locations with the same name
# e.g. island groups like shetlands OR admin polygon & admin_centre node with same name
if tupleLocPhrase1 == tupleLocPhrase2 :
continue
# make sure this geom appears in cache
if not strGeom2 in geom_cache :
geom_cache[strGeom2] = [ set([]),set([]),set([]),set([]) ]
# is this location within the semantic distance to check for parent? (but not the same phrase)
# note: always compare distance to non-text geoms as semantic distance has no meaning for these
# and a location mention nearby a geotag will disambiguate that location mention nicely
bCheckDistance = False
bCheckIntersects = False
if (nTokenStart2 > nTokenEnd1) and (nTokenStart2 <= nTokenEnd1 + semantic_distance) :
bCheckDistance = True
if (nTokenEnd2 < nTokenStart1) and (nTokenEnd2 >= nTokenStart1 - semantic_distance) :
bCheckDistance = True
# check if loc geom is actually the same OSMID as the target
# - if so dont check as this would be self-confirming
# note: use set intersection as locs can have multiple OSMID's (e.g. group of islands)
setOSMID1 = set( listLocs[nLocIndex1][index_osm_id] )
setOSMID2 = set( listLocs[nLocIndex2][index_osm_id] )
if len( setOSMID1 & setOSMID2 ) > 0 :
bCheckDistance = False
bCheckIntersects = False
continue
# check parent lists
# - if loc geom is a parent then dont check it as phase 2 subsumption test will have added confidence already (worth 10)
# - if loc geom is a child then add confidence
# note: this avoids a needless lookup (distance & intersection)
if bCheckIntersects == True or bCheckDistance == True :
setParentOSMID1 = set( listLocs[nLocIndex1][index_osm_parents] )
setParentOSMID2 = set( listLocs[nLocIndex2][index_osm_parents] )
if len( setParentOSMID1 ) > 0 :
if len( setParentOSMID1 & setOSMID2 ) > 0 :
bCheckDistance = False
bCheckIntersects = False
continue
if len( setParentOSMID2 ) > 0 :
if len( setParentOSMID2 & setOSMID1 ) > 0 :
nConf = nConf + 1
# break so we only add 1 to conf for a closeness match
break
# check intersection
if bCheckIntersects == True :
# have we done this geom comparison before? if so reuse result
# otherwise do a shape comparison to check for geographic subsumption
listGeomResult = geom_cache[strGeom1]
if tupleOSMID2 in listGeomResult[2] :
nConf = nConf + 1
#dict_geospatial_config['logger'].info( 'GEO_INTERSECT (cached) = ' + repr( listLocs[nLocIndex1][nOriginalIndex] ) + ' close_to ' + repr( listLocs[nLocIndex2][nOriginalIndex] ) )
# break so we only add 1 to conf for a intersects match
break
elif tupleOSMID2 in listGeomResult[3] :
#dict_geospatial_config['logger'].info( 'NOT GEO_INTERSECT (cached) = ' + repr( listLocs[nLocIndex1][nOriginalIndex] ) + ' close_to ' + repr( listLocs[nLocIndex2][nOriginalIndex] ) )
pass
else :
if dictGeomIndex == None :
# index location geoms using the OSMID array as a unique identifier to avoid calculating duplicate geoms
# dictGeomIndex = dict <- calc_geom_index() = { tupleOSMID : [ ( shapely_geom_prepared, shapely_envelope, shapely_geom, shapely_representative_point, dict_OSM_tag, geom_serialized ), ... ] }
# dictGeomResultsCache = { tupleOSMID : [ set(close_tuple_IDs), set(not_close_tuple_IDs), set(intersects_tuple_IDs), set(not_intersects_tuple_IDs), set(serialized_geom) ] }
# note: only create it when its needed as its slow to do geom work
dictGeomIndex = calc_geom_index( listLocs, index_geom = index_geom, index_id = index_osm_id, index_osm_tag = None )
# get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups)
shapeLoc1 = None
shapePoint1 = None
for tupleGeomData in dictGeomIndex[tupleOSMID1] :
if strGeom1 == tupleGeomData[5] :
# use prepared geom for shapely match efficiency
shapeLoc1 = tupleGeomData[0]
# use representative point for distance() as using the full geom is much too slow
# for complex regions like russia (e.g. 1 min calc time which does not scale to 1000's of calls)
shapePoint1 = tupleGeomData[3]
break
# get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups)
shapeLoc2 = None
shapePoint2 = None
for tupleGeomData in dictGeomIndex[tupleOSMID2] :
if strGeom2 == tupleGeomData[5] :
# use normal geom (shapely prepared shape intersect function needs a normal shape to work on)
shapeLoc2 = tupleGeomData[2]
# use representative point for distance() as using the full geom is much too slow
# for complex regions like russia (e.g. 1 min calc time which does not scale to 1000's of calls)
shapePoint2 = tupleGeomData[3]
break
if (shapeLoc1 != None) and (shapeLoc2 != None) :
# debug
#dict_geospatial_config['logger'].info( 'TEST GEO 2 = ' + repr( [tupleOSMID1,tupleOSMID2] ) )
#dict_geospatial_config['logger'].info( 'TEST 3 = ' + repr( [setParentOSMID1,setOSMID2] ) )
#dict_geospatial_config['logger'].info( 'TEST 4 = ' + repr( [nTokenStart1,nTokenStart2] ) )
#dict_geospatial_config['logger'].info( 'phase3 shapely intersect check = ' + repr( [tupleOSMID1,tupleOSMID2] ) )
# calc intersections between two shapes. if so add to confidence
if shapeLoc1.intersects( shapeLoc2 ) == True :
nConf = nConf + 1
#dict_geospatial_config['logger'].info('GEO_INTERSECT = ' + repr( [listLocs[nLocIndex1][nOriginalIndex],listLocs[nLocIndex2][nOriginalIndex]] ))
# remember loc1 -> loc2
listGeomResult[2].add( tupleOSMID2 )
geom_cache[strGeom1] = listGeomResult
# remember loc2 -> loc1
listGeomResult = geom_cache[strGeom2]
if not tupleOSMID1 in listGeomResult[2] :
listGeomResult[2].add( tupleOSMID1 )
geom_cache[strGeom2] = listGeomResult
# break so we only add 1 to conf for a closeness match
break
else :
# dict_geospatial_config['logger'].info('NOT GEO_LOC = ' + repr( [listLocs[nLocIndex1][nOriginalIndex],listLocs[nLocIndex2][nOriginalIndex]] ))
# remember NOT loc1 -> loc2
listGeomResult[3].add( tupleOSMID2 )
geom_cache[tupleOSMID1] = listGeomResult
# remember NOT loc2 -> loc1
listGeomResult = geom_cache[strGeom2]
if not tupleOSMID1 in listGeomResult[2] :
listGeomResult[3].add( tupleOSMID1 )
geom_cache[strGeom2] = listGeomResult
# check distance
if bCheckDistance == True :
# have we done this geom comparison before? if so reuse result
# otherwise do a shape comparison to check for geographic subsumption
listGeomResult = geom_cache[strGeom1]
if tupleOSMID2 in listGeomResult[0] :
nConf = nConf + 1
#dict_geospatial_config['logger'].info( 'GEO_LOC (cached) = ' + repr( listLocs[nLocIndex1][nOriginalIndex] ) + ' close_to ' + repr( listLocs[nLocIndex2][nOriginalIndex] ) )
# break so we only add 1 to conf for a intersects match
break
elif tupleOSMID2 in listGeomResult[1] :
#dict_geospatial_config['logger'].info( 'NOT GEO_LOC (cached) = ' + repr( listLocs[nLocIndex1][nOriginalIndex] ) + ' close_to ' + repr( listLocs[nLocIndex2][nOriginalIndex] ) )
pass
else :
if dictGeomIndex == None :
# index location geoms using the OSMID array as a unique identifier to avoid calculating duplicate geoms
# dictGeomIndex = dict <- calc_geom_index() = { tupleOSMID : [ ( shapely_geom_prepared, shapely_envelope, shapely_geom, shapely_representative_point, dict_OSM_tag, geom_serialized ), ... ] }
# dictGeomResultsCache = { tupleOSMID : [ set(close_tuple_IDs), set(not_close_tuple_IDs), set(intersects_tuple_IDs), set(not_intersects_tuple_IDs), set(serialized_geom) ] }
# note: only create it when its needed as its slow to do geom work
dictGeomIndex = calc_geom_index( listLocs, index_geom = index_geom, index_id = index_osm_id, index_osm_tag = None )
# get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups)
shapeLoc1 = None
shapePoint1 = None
for tupleGeomData in dictGeomIndex[tupleOSMID1] :
if strGeom1 == tupleGeomData[5] :
# use prepared geom for shapely match efficiency
shapeLoc1 = tupleGeomData[0]
# use representative point for distance() as using the full geom is much too slow
# for complex regions like russia (e.g. 1 min calc time which does not scale to 1000's of calls)
shapePoint1 = tupleGeomData[3]
break
# get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups)
shapeLoc2 = None
shapePoint2 = None
for tupleGeomData in dictGeomIndex[tupleOSMID2] :
if strGeom2 == tupleGeomData[5] :
# use normal geom (shapely prepared shape intersect function needs a normal shape to work on)
shapeLoc2 = tupleGeomData[2]
# use representative point for distance() as using the full geom is much too slow
# for complex regions like russia (e.g. 1 min calc time which does not scale to 1000's of calls)
shapePoint2 = tupleGeomData[3]
break
if (shapeLoc1 != None) and (shapeLoc2 != None) :
# debug
#dict_geospatial_config['logger'].info( 'TEST GEO 2 = ' + repr( [tupleOSMID1,tupleOSMID2] ) )
#dict_geospatial_config['logger'].info( 'TEST 3 = ' + repr( [setParentOSMID1,setOSMID2] ) )
#dict_geospatial_config['logger'].info( 'TEST 4 = ' + repr( [nTokenStart1,nTokenStart2] ) )
#dict_geospatial_config['logger'].info( 'phase3 shapely distance check = ' + repr( [tupleOSMID1,tupleOSMID2] ) )
# calc distance between two shapes and check if it is within a
# given closeness. if so add to confidence
if shapePoint1.distance( shapePoint2 ) <= geom_distance :
nConf = nConf + 1
#dict_geospatial_config['logger'].info('GEO_LOC = ' + repr( [listLocs[nLocIndex1][nOriginalIndex],listLocs[nLocIndex2][nOriginalIndex]] ))
# remember loc1 -> loc2
listGeomResult[0].add( tupleOSMID2 )
geom_cache[strGeom1] = listGeomResult
# remember loc2 -> loc1
listGeomResult = geom_cache[strGeom2]
if not tupleOSMID1 in listGeomResult[0] :
listGeomResult[0].add( tupleOSMID1 )
geom_cache[strGeom2] = listGeomResult
# break so we only add 1 to conf for a closeness match
break
else :
# dict_geospatial_config['logger'].info('NOT GEO_LOC = ' + repr( [listLocs[nLocIndex1][nOriginalIndex],listLocs[nLocIndex2][nOriginalIndex]] ))
# remember NOT loc1 -> loc2
listGeomResult[1].add( tupleOSMID2 )
geom_cache[strGeom1] = listGeomResult
# remember NOT loc2 -> loc1
listGeomResult = geom_cache[strGeom2]
if not tupleOSMID1 in listGeomResult[1] :
listGeomResult[1].add( tupleOSMID1 )
geom_cache[strGeom2] = listGeomResult
# assert new conf value
listResult[ listLocs[nLocIndex1][nOriginalIndex] ] = nConf
# debug
"""
for nLocIndex1 in range(len(listLocs)) :
dict_geospatial_config['logger'].info('TEST3 = ' + str(listLocs[nLocIndex1][1]) + ' : ' + str(listLocs[nLocIndex1][index_osm_id]) + ':' + str(listLocs[nLocIndex1][2]) + ' = ' + str(listLocs[nLocIndex1][3]) + ' = ' + str(listResult[ listLocs[nLocIndex1][nOriginalIndex] ]) )
"""
# phase 4 - admin level disambiguation
# for contested tokens choose the location(s) with the lowest OSM admin level
# and discount no admin level choices (buildings, roads) and points (e.g. admin centres which should have a polygon option available anyway)
# so 'London' city in UK is preferred to 'London' province in Canada
if 4 in confidence_tests :
for nLocIndex1 in range(len(listLocs)) :
nTokenStart1 = listLocs[nLocIndex1][index_token_start]
nTokenEnd1 = listLocs[nLocIndex1][index_token_end]
if not isinstance( listLocs[nLocIndex1][index_osm_tags], dict ) :
raise Exception( 'OSM tag array not type <dict>' )
dictOSMTags1 = listLocs[nLocIndex1][index_osm_tags]
tupleOSMID1 = tuple( listLocs[nLocIndex1][index_osm_id] )
strGeom1 = listLocs[nLocIndex1][index_geom]
# if location has an admin level then if we find a location with a lower level set current loc confidence to 0
# note: OSM admin level is from 1 to 10, so use 12 for no value to always fail in comparison
# note: some points have an admin value (e.g. admin centre linked to a cities way) so discount them also as we want the polygon to be chosen over the admin centre point
if 'admin level' in dictOSMTags1 :
nAdminLevel1 = int( dictOSMTags1['admin level'] )
if strGeom1.lower().startswith( 'point(' ) :
nAdminLevel1 = 11
else :
nAdminLevel1 = 12
# get confidence level of current token
nConf1 = listResult[ listLocs[nLocIndex1][nOriginalIndex] ]
if nConf1 > 0 :
for nLocIndex2 in range(len(listLocs)) :
nConf2 = listResult[ listLocs[nLocIndex2][nOriginalIndex] ]
nTokenStart2 = listLocs[nLocIndex2][index_token_start]
nTokenEnd2 = listLocs[nLocIndex2][index_token_end]
tupleOSMID2 = tuple( listLocs[nLocIndex2][index_osm_id] )
if not isinstance( listLocs[nLocIndex2][index_osm_tags], dict ) :
raise Exception( 'OSM tag array not type <dict>' )
dictOSMTags2 = listLocs[nLocIndex2][index_osm_tags]
strGeom2 = listLocs[nLocIndex2][index_geom]
# is this loc for the same token? is confdience level the same?
# if so use admin_level to disambiguate and zero the confidence of the higher admin level location
if (tupleOSMID1 != tupleOSMID2) and (nTokenStart1 == nTokenStart2) and (nTokenEnd1 == nTokenEnd2) and (nConf1 == nConf2) :
if 'admin level' in dictOSMTags2 :
nAdminLevel2 = int( dictOSMTags2['admin level'] )
if strGeom2.lower().startswith( 'point(' ) :
nAdminLevel2 = 11
else :
nAdminLevel2 = 12
# if admin level of this other loc < admin level of current loc then set the current loc confidence to 0 as we should prefer other loc
if nAdminLevel2 < nAdminLevel1 :
#DEBUG
#dict_geospatial_config['logger'].info( 'DEBUG Phase 4 : ' + repr(tupleOSMID2) + ' admin level ' + str(nAdminLevel2) + ' > ' + repr(tupleOSMID1) + ' admin level ' + str(nAdminLevel1) )
nConf1 = 0
break
# update confidence level of current token (i.e. 0 if we have found a location using same token but with a higher admin level)
listResult[ listLocs[nLocIndex1][nOriginalIndex] ] = nConf1
# debug
"""
for nLocIndex1 in range(len(listLocs)) :
dict_geospatial_config['logger'].info('TEST4 = ' + str(listLocs[nLocIndex1][1]) + ' : ' + str(listLocs[nLocIndex1][index_osm_id]) + ':' + str(listLocs[nLocIndex1][2]) + ' = ' + str(listLocs[nLocIndex1][3]) + ' = ' + str(listResult[ listLocs[nLocIndex1][nOriginalIndex] ]) )
"""
# tidy up
if dictGeomIndex != None :
del dictGeomIndex
# all done
return listResult
[docs]def filter_matches_by_confidence( list_loc, dict_geospatial_config, index_token_start = 1, index_token_end = 2, index_osm_id = 5, index_osm_parents = 7, index_osm_tags = 6, semantic_distance = 6, index_geom = 4, geom_distance = 0.25, index_loc_tokens = 3, confidence_tests = (1,2,3,4), geom_context = None, geom_cache = {} ) :
"""
filter a list of matches by match confidence using geo_parse_lib.calc_location_confidence() scores. only the highest ranked locations for each token will be kept, with the others removed from the list
:param list list_loc: list of locations with geom information from geo_parse_lib.create_matched_location_list(). this list will be filtered with rows removed that rank low on match confidence.
:param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config()
:param int index_token_start: index of matched token start position in list_loc
:param int index_token_end: index of matched token end position in list_loc
:param int index_osm_id: index of OSM ID in list_loc
:param int index_osm_parents: index of OSM ID of super regions to this matches location in list_loc
:param int index_osm_tags: index of OSM tags in list_loc
:param int semantic_distance: number of tokens (left and right) to look for semantically nearby location checks e.g. 'London in UK'
:param int index_geom: index of serialized OpenGIS geom in list_loc
:param int geom_distance: distance for shapely distance check (in degrees)
:param int index_loc_tokens: index of matched loc tokens
:param int confidence_tests: confidence check tests to run when calculating a confidence value. 1 = token subsumption, 2 = nearby parent region, 3 = nearby locations and nearby geotag, 4 = general before specific
:param dict geom_cache: cache of geom checks with distance and intersection results to avoid running the same shapely checks twice. this cache will be populated with any new geoms that are checked using shapely so might get large over time. e.g. dict{ strGeom : [ set(close_tuple_IDs), set(not_close_tuple_IDs), set(intersects_tuple_IDs), set(not_intersects_tuple_IDs) ] }
:return: a list of confidence values (0..20) for each location in list_loc. locations with a common token can be ranked by confidence and the highest value taken. a confidence of 0 means the location should be rejected regardless. semantically close locations provide scores 1+. geotags inside locations provide scores 10+.
:rtype: list
"""
# check args without defaults
if not isinstance( list_loc, list ) :
raise Exception( 'invalid list_loc' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# remove exact duplicates (token range, OSM id, geom)
for nMatchIndex1 in range(len(list_loc)) :
listEntry1 = list_loc[nMatchIndex1]
if listEntry1 != None :
for nMatchIndex2 in range(nMatchIndex1+1,len(list_loc)) :
listEntry2 = list_loc[nMatchIndex2]
if listEntry2 != None :
if (listEntry1[index_token_start] == listEntry2[index_token_start]) and (listEntry1[index_token_end] == listEntry2[index_token_end]) and (listEntry1[index_osm_id] == listEntry2[index_osm_id]) and (listEntry1[index_geom] == listEntry2[index_geom]) :
# void duplicate (inc geom)
list_loc[nMatchIndex2] = None
# delete voided rows
while list_loc.count( None ) :
list_loc.remove( None )
# calc confidence values for each location match
listLocConfidence = calc_location_confidence(
list_loc = list_loc,
index_token_start = index_token_start,
index_token_end = index_token_end,
index_osm_id = index_osm_id,
index_osm_parents = index_osm_parents,
index_osm_tags = index_osm_tags,
semantic_distance = semantic_distance,
dict_geospatial_config = dict_geospatial_config,
index_geom = index_geom,
geom_distance = geom_distance,
index_loc_tokens = index_loc_tokens,
confidence_tests = confidence_tests,
geom_context = geom_context,
geom_cache = geom_cache )
# remove location matches that have a 0 confidence as these are incidental matches of phrases gram tokens within higher gram phrases
# and for the rest append the conf value to the match list data
# listLocInfo = [ <source_loc_id>, <token_start>, <token_end>, loc_tokens, geom, (<osm_id>, ...), {<osm_tag>:<value>}*N_tags, (<osm_id_parent>, ...), confidence ]*N_locs
nIndexConf = -1
for nMatchIndex1 in range(len(list_loc)) :
nConf = listLocConfidence[ nMatchIndex1 ]
if nConf == 0 :
list_loc[nMatchIndex1] = None
else :
list_loc[nMatchIndex1].append( nConf )
nIndexConf = len( list_loc[nMatchIndex1] ) - 1
while list_loc.count( None ) :
list_loc.remove( None )
# where we have multiple loc matches for same phrase remove all matches that do not have the highest confidence value
# e.g. Donetsk, Ukraine + Donetsk ,Russia ==> if Ukraine conf higher lose Russia location
# (a) make index of phrases (b) calc top conf for each phrase (c) remove low conf phrase matches
dictPhrase = {}
for nMatchIndex1 in range(len(list_loc)) :
# tuplePhrase = ( <token_start>, <token_end> )
tuplePhrase = ( list_loc[nMatchIndex1][index_token_start], list_loc[nMatchIndex1][index_token_end] )
if not tuplePhrase in dictPhrase :
dictPhrase[ tuplePhrase ] = [nMatchIndex1]
else :
dictPhrase[ tuplePhrase ].append( nMatchIndex1 )
for tuplePhrase1 in dictPhrase :
# get max
nConfMax = 0
for nMatchIndex1 in dictPhrase[ tuplePhrase1 ] :
if nConfMax < list_loc[nMatchIndex1][nIndexConf] :
nConfMax = list_loc[nMatchIndex1][nIndexConf]
for nMatchIndex1 in dictPhrase[ tuplePhrase1 ] :
# void non-max
if list_loc[nMatchIndex1][nIndexConf] < nConfMax :
list_loc[nMatchIndex1][nIndexConf] = 0
# remove location matches that have a 0 confidence (again)
for nMatchIndex1 in range(len(list_loc)) :
nConf = list_loc[nMatchIndex1][nIndexConf]
if nConf == 0 :
list_loc[nMatchIndex1] = None
while list_loc.count( None ) :
list_loc.remove( None )
[docs]def filter_matches_by_geom_area( list_loc, dict_geospatial_config, index_token_start = 1, index_token_end = 2, index_osm_id = 5, index_geom = 4, same_osmid_only = False ) :
"""
filter a list of matches to favour locations with the largest area (e.g. liverpool city border > liverpool admin centre point, liverpool city in UK > liverpool suburb in AU). this is helpful to choose a single
match from a list of matches with same confidence as nomrally people are referring to the larger more populated area
:param list list_loc: list of locations with geom information from geo_parse_lib.create_matched_location_list(). this list will be filtered with rows removed that do not have parents in the region of interest
:param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config()
:param int index_token_start: index of matched token start position in list_loc
:param int index_token_end: index of matched token end position in list_loc
:param int index_osm_id: index of OSM ID in list_loc
:param int index_geom: index of serialized OpenGIS geom in list_loc
:param bool same_osmid_only: if True limit loc removal to same OSMIDs i.e. remove smaller geoms for same OSMID if several geoms matched (e.g. admin nodes and city polygons or several island polygons)
"""
# check args without defaults
if not isinstance( list_loc, list ) :
raise Exception( 'invalid list_loc' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
dictGeomIndex = None
# remove any location matches with same tokens and OSMID but which have a smaller geom
for nIndexLoc1 in range(len(list_loc)) :
if list_loc[nIndexLoc1] != None :
tupleOSMID1 = list_loc[nIndexLoc1][index_osm_id]
tuplePhrase1 = ( list_loc[nIndexLoc1][index_token_start], list_loc[nIndexLoc1][index_token_end] )
strGeom1 = list_loc[nIndexLoc1][index_geom]
for nIndexLoc2 in range(nIndexLoc1,len(list_loc)) :
if list_loc[nIndexLoc2] != None :
tupleOSMID2 = list_loc[nIndexLoc2][index_osm_id]
strGeom2 = list_loc[nIndexLoc2][index_geom]
tuplePhrase2 = ( list_loc[nIndexLoc2][index_token_start], list_loc[nIndexLoc2][index_token_end] )
if tuplePhrase1 == tuplePhrase2 :
if (same_osmid_only == False) or (tupleOSMID1 == tupleOSMID2) :
# index location geoms using the OSMID array as a unique identifier to calculate shapes
# dictGeomIndex = dict <- calc_geom_index() = { tupleOSMID : [ ( shapely_geom_prepared, shapely_envelope, shapely_geom, shapely_representative_point, dict_OSM_tag, geom_serialized ), ... ] }
# note: only create it when its needed to as its slow
if dictGeomIndex == None :
dictGeomIndex = calc_geom_index( list_loc, index_geom = index_geom, index_id = index_osm_id, index_osm_tag = None )
# get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups)
shapeLoc1 = None
for tupleGeomData in dictGeomIndex[tupleOSMID1] :
if strGeom1 == tupleGeomData[5] :
shapeLoc1 = tupleGeomData[2]
break
# get the polygon for this location (osmids can have multiple polygons so get right one e.g. island groups)
shapeLoc2 = None
for tupleGeomData in dictGeomIndex[tupleOSMID2] :
if strGeom2 == tupleGeomData[5] :
shapeLoc2 = tupleGeomData[2]
break
# if no shape available always use the option with a shape
if shapeLoc1 == None :
list_loc[nIndexLoc1] = None
break
if shapeLoc2 == None :
list_loc[nIndexLoc2] = None
break
# note: points and lines have a zero area
if shapeLoc2.area > shapeLoc1.area :
# loc1 has a smaller area so remove it
#dict_geospatial_config['logger'].info( 'AREA REJECT ' + str(tupleOSMID1) + ' : ' + str(tuplePhrase1) )
list_loc[nIndexLoc1] = None
break
while list_loc.count( None ) :
list_loc.remove( None )
if dictGeomIndex != None :
del dictGeomIndex
# all done
return
[docs]def filter_matches_by_region_of_interest( list_loc, list_regions_of_interest, dict_geospatial_config, index_osm_parents = 7 ) :
"""
filter a list of matches by region of interest. all locations who do not have a parent in the region of interest list will be removed from the list
:param list list_loc: list of locations with geom information from geo_parse_lib.create_matched_location_list(). this list will be filtered with rows removed that do not have parents in the region of interest
:param list list_regions_of_interest: list of OSM IDs for regions of interest
:param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config()
:param int index_osm_parents: index of OSM ID of super regions to this matches location in list_loc
"""
# check args without defaults
if not isinstance( list_loc, list ) :
raise Exception( 'invalid list_loc' )
if not isinstance( list_regions_of_interest, list ) :
raise Exception( 'invalid list_regions_of_interest' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# nothing to do?
if len(list_regions_of_interest) == 0 :
raise Exception( 'empty list_regions_of_interest' )
# remove location matches that do not have this parent in thier super region list
for nIndexLoc in range(len(list_loc)) :
if list_loc[nIndexLoc] != None :
bFound = False
for nIndexParentID in range(len(list_regions_of_interest)) :
nOSMIDParent = list_regions_of_interest[nIndexParentID]
if nOSMIDParent in list_loc[nIndexLoc][index_osm_parents] :
bFound = True
break
if bFound == False :
list_loc[nIndexLoc] = None
while list_loc.count( None ) :
list_loc.remove( None )
# all done
return
[docs]def calc_multilingual_osm_name_set( dict_osm_tags, dict_geospatial_config ) :
"""
return a list of name variants from the OSM tag set for a location. this will include the name, alternative and short names, abreviations and languages variants
:param dict dict_osm_tags: OSM tags for this location
:param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config()
:return: list of name variants for this location
:rtype: list
"""
# check args without defaults
if not isinstance( dict_osm_tags, dict ) :
raise Exception( 'invalid dict_osm_tags' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# init
listResult = []
# compile a list of tags to check
# deliberately put the native name last as we would like to get the first language on the supported lang list first for human readability
listTagsToCheck = []
for strLangCode in dict_geospatial_config['lang_codes'] :
listTagsToCheck.extend( [ 'name:' + strLangCode, 'alt name:' + strLangCode, 'old name:' + strLangCode ] )
listTagsToCheck.extend( [ 'name', 'ref', 'loc ref', 'nat ref', 'old ref', 'reg ref', 'ISO3166-1', 'ISO3166-1:alpha2', 'ISO3166-1:alpha3' ] )
listTagsToCheck.extend( [ 'alt name', 'int name', 'loc name', 'nat name', 'old name', 'reg name', 'short name', 'name:abbreviation', 'name:simple', 'sorting name' ] )
# check for OSM reference tags and add them (avoid duplicates)
for strTag in listTagsToCheck :
if strTag in dict_osm_tags :
if not dict_osm_tags[ strTag ] in listResult :
listResult.append( dict_osm_tags[ strTag ] )
# return the list
return listResult
[docs]def calc_best_osm_name( target_lang, dict_osm_tags, dict_geospatial_config ) :
"""
return a location name in a target language or best next alternative. the default name is the name in the native language of the region.
:param str target_lang: language code of preference for name
:param dict dict_osm_tags: OSM tags for this location
:param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config()
:return: location name
:rtype: unicode
"""
# check args without defaults
if (not isinstance( target_lang, str )) and (not isinstance( target_lang, str )) :
raise Exception( 'invalid target_lang' )
if not isinstance( dict_osm_tags, dict ) :
raise Exception( 'invalid dict_osm_tags' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# check for names in target language
if 'name:' + target_lang in dict_osm_tags :
return dict_osm_tags[ 'name:' + target_lang ]
if 'alt name:' + target_lang in dict_osm_tags :
return dict_osm_tags[ 'alt name:' + target_lang ]
if 'old name:' + target_lang in dict_osm_tags :
return dict_osm_tags[ 'old name:' + target_lang ]
# sometimes a sorting name is created (english)
if target_lang == 'en' :
if 'sorting name' in dict_osm_tags :
return dict_osm_tags[ 'sorting name' ]
# otherwise default is to go for the basic name in the native language of the location
if 'name' in dict_osm_tags :
return dict_osm_tags[ 'name' ]
# fail! this should be impossible as name is always there
raise Exception( 'location found without a sensible name : ' + repr(dict_osm_tags) )
[docs]def calc_inverted_index( list_data, dict_geospatial_config, index_phrase = 6, index_id = 2 ) :
"""
compile an inverted index from a list of arbirary data where one column is a phrase string. the inverted index key is the phrase as a tokenized tuple e.g. ('new','york'). the inverted index value is an ID value linking back to the original list of data e.g. OSM ID tuple or just a row index.
| note: the default index values are preset for the list of cached locations from geo_preprocess_lib.cache_preprocessed_locations()
:param list list_data: list of data to create an inverted index for e.g. result of geo_preprocess_lib.cache_preprocessed_locations()
:param dict dict_geospatial_config: config object returned from geo_parse_lib.get_geoparse_config()
:param int index_phrase: column index in list_data of phrase text to use as a key for inverted index (str/unicode OR list/tuple of str/unicode)
:param int index_id: column index in list_data of an ID that the inverted index will point to (str/unicode/list/tuple). A value of None will mean the list_data row index is used as an ID
:return: inverted index where key = phrase as a tuple, value = ID value for original data list
:rtype: dict
"""
# check args without defaults
if not isinstance( list_data, list ) :
raise Exception( 'invalid list_data' )
if not isinstance( dict_geospatial_config, dict ) :
raise Exception( 'invalid dict_geospatial_config' )
# anything to do ?
if len(list_data) == 0 :
return {}
# check index OK
if len(list_data[0]) <= index_phrase :
raise Exception( 'phrase index >= len(source data)' )
if (index_id != None) and (len(list_data[0]) <= index_id) :
raise Exception( 'ID index >= len(source data)' )
dictInvertedIndex = {}
for nDocIndex in range(len(list_data)) :
# get source phrase
objPhrase = list_data[nDocIndex][index_phrase]
# make a list of phrases to add for this document
listPhrase = []
if isinstance( objPhrase, list) :
listPhrase = objPhrase
elif isinstance( objPhrase, tuple) :
listPhrase = list( objPhrase )
elif isinstance( objPhrase, str) :
listPhrase = [objPhrase]
elif isinstance( objPhrase, str) :
listPhrase = [objPhrase]
else :
raise Exception( 'object type of phrase not list,tuple,str,unicode' )
# add each phrase
for strPhrase in listPhrase :
# calc a ngram token for this phrase
tuplePhrase = tuple( soton_corenlppy.common_parse_lib.tokenize_sentence( str(strPhrase), dict_geospatial_config ) )
# add each token to the inverted index using the document index as value
objIndex = nDocIndex
if index_id != None :
objIndex = list_data[nDocIndex][index_id]
if tuplePhrase in dictInvertedIndex :
dictInvertedIndex[ tuplePhrase ].add( objIndex )
else :
dictInvertedIndex[ tuplePhrase ] = set( [objIndex] )
# return inverted index
return dictInvertedIndex
[docs]def calc_osmid_lookup( cached_locations ) :
"""
create an index of osmid to row indexes in the cached_locations
:param dict cached_locations: list of cached locations from geo_preprocess_lib.cache_preprocessed_locations()
:return: lookup table mapping an osmid to a set of rows in the cached locations (osmids can many entries each with a different geom such as island groups)
:rtype: dict
"""
# check args without defaults
if not isinstance( cached_locations, list ) :
raise Exception( 'invalid cached_locations' )
osmid_lookup = {}
for nIndexLoc in range(len(cached_locations)) :
tupleID = tuple( cached_locations[nIndexLoc][2] )
if not tupleID in osmid_lookup :
osmid_lookup[tupleID] = set([])
osmid_lookup[ tupleID ].add( nIndexLoc )
return osmid_lookup