# !/usr/bin/env python
# -*- coding: utf-8 -*-
"""
..
/////////////////////////////////////////////////////////////////////////
//
// (c) Copyright University of Southampton, 2012
//
// Copyright in this software belongs to University of Southampton,
// Highfield, Southampton, SO17 1BJ, United Kingdom
//
// This software may not be used, sold, licensed, transferred, copied
// or reproduced in whole or in part in any manner or form or in or
// on any media by any person other than in accordance with the terms
// of the Licence Agreement supplied with the software, or otherwise
// without the prior written consent of the copyright owners.
//
// This software is distributed WITHOUT ANY WARRANTY, without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE, except where stated in the Licence Agreement supplied with
// the software.
//
// Created By : Stuart E. Middleton
// Created Date : 2014/05/13
// Created for Project: REVEAL
//
/////////////////////////////////////////////////////////////////////////
//
// Dependancies: None
//
/////////////////////////////////////////////////////////////////////////
'''
Pre-processing module to create and populate SQL tables for focus areas from an installed OpenStreetMap database. Pre-processed SQL tables are required for geoparsepy.geo_parse_lib
global focus area spec (required before focus area is preprocessed as it contains super region information)::
{
'focus_area_id' : 'global_cities',
}
focus area spec (OSM ID's)::
{
'focus_area_id' : 'gr_paris',
'osmid': ['relation',71525, 'relation', 87002, 'relation', 86999, 'relation', 86985, 'relation', 85802, 'relation', 91776, 'relation', 72258, 'relation', 72148, 'relation', 31340, 'relation', 72020, 'relation', 85527, 'relation', 59321, 'relation', 37027, 'relation', 37026, 'relation', 104479, 'relation', 105122, 'relation', 105748, 'relation', 104868, 'relation', 108318, 'relation', 129550, 'relation', 130544, 'relation', 67826, 'relation', 67685, 'relation', 87628, 'relation', 87922],
'admin_lookup_table' : 'global_cities_admin',
}
focus area spec (name and super regions)::
{
'focus_area_id' : 'southampton',
'admin': ['southampton','south east england', 'united kingdom'],
'admin_lookup_table' : 'global_cities_admin',
}
focus area spec (radius from point)::
{
'focus_area_id' : 'oxford',
'radius': ['POINT(-1.3176274 51.7503955)', 0.2],
'admin_lookup_table' : 'global_cities_admin',
}
focus area spec (geom)::
{
'focus_area_id' : 'solent_shipping_lane',
'geom': 'POLYGON(( ... ))',
'admin_lookup_table' : 'global_cities_admin',
}
focus area spec (places with only a point within a set of super regions)::
{
'focus_area_id' : 'uk_places',
'place_types': ['suburb','quarter','neighbourhood','town','village','island','islet','archipelago'],
'parent_osmid': ['relation',62149],
'admin_lookup_table' : 'global_cities_admin',
}
{
'focus_area_id' : 'europe_places',
'place_types': ['suburb','quarter','neighbourhood','town','village','island','islet','archipelago'],
'parent_osmid': ['relation',62149, 'relation', 1403916, 'relation', 1311341, 'relation', 295480, 'relation', 9407, 'relation', 50046, 'relation', 2323309, 'relation', 51477, 'relation', 365331, 'relation', 51701, 'relation', 2978650, 'relation', 54224, 'relation', 52822, 'relation', 62273, 'relation', 51684, 'relation', 79510, 'relation', 72594, 'relation', 72596, 'relation', 49715, 'relation', 59065, 'relation', 60189, 'relation', 16239, 'relation', 218657, 'relation', 21335, 'relation', 14296, 'relation', 214885, 'relation', 1741311, 'relation', 2528142, 'relation', 90689, 'relation', 58974, 'relation', 60199, 'relation', 53296, 'relation', 2088990, 'relation', 53292, 'relation', 53293, 'relation', 186382, 'relation', 192307, 'relation', 174737, 'relation', 307787, 'relation', 1124039, 'relation', 365307, 'relation', 1278736],
'admin_lookup_table' : 'global_cities_admin',
}
note: uk, france, spain, portugal, andorra, denmark, holland, germany, italy, switzerland, norway, finland, sweden, ireland, czech republic, estonia, latvia, lithuania, poland, belarus, russia, austria, slovenia, hungary, slovakia, croatia, serbia, bosnia and herzegovina, romania, moldova, ukraine, montenegro, kosovo, albania, macedonia, bulgaria, greece, turkey, cyprus, monaco, malta, gibralta
focus area spec (global places with only a point)::
{
'focus_area_id' : 'global_places',
'place_types': ['suburb','quarter','neighbourhood','town','village','island','islet','archipelago'],
'admin_lookup_table' : 'global_cities_admin',
}
Performance notes:
* Preprocesssing time is related to number of points/line/polygons (N) and number of admin regions (M). admin regions are cross-checked for containment so this there are N*M calculations to perform.
* global_cities (300,000 polygons) will take about 3 days to compute on a 2 GHz CPU (only need do it once of course).
* uk_places (20,000 points) takes 20 mins.
* france_places (40,000 points) take 2 hours.
* europe_places (420,000 points) is made up of each european country. this allows a sequential country-by-country calculation which reduced the size of M and is vastly quicker than global places. it takes 7 hours to compute.
* north_america_places (usa and canada) (52,000 points) takes 1 hour to compute.
* au_nz_places (australia and new zealand) (8,000 points) takes 3 mins to compute.
Alternative approach is OSM reverse geocoding using Nominatim:
* Nominatim is a Linux GPL script/lib used by OSM to create an indexed planet OSM dataset that can then be looked up for reverse geocoding (i.e. name -> OSMID)
* HTTP service available via OSM but this does not scale for large number of locations (as throughput is too slow)
* local deployment of Nominatim is possible but indexing is built into the planet OSM deployment scripts (i.e. takes 10+ days to run) and is apparently very complex and difficult to get working
* see http://wiki.openstreetmap.org/wiki/Nominatim
"""
import os, sys, logging, traceback, codecs, datetime, copy, threading, queue, time
import soton_corenlppy, geoparsepy
import psycopg2, shapely, shapely.speedups, shapely.prepared, shapely.wkt, shapely.geometry
[docs]def create_preprocessing_tables( focus_area, database_handle, schema, table_point = 'point', table_line = 'line', table_poly = 'poly', table_admin = 'admin', timeout_statement = 60, timeout_overall = 60, delete_contents = False, logger = None ) :
"""
create preprocessing tables for a new focus area (if they do not already exist)
:param dict focus_area: focus area description to create tables for.
:param PostgresqlHandler.PostgresqlHandler database_handle: handle to database object
:param str schema: Postgres schema name under which tables will be created
:param str table_point: Table name suffix to append to focus area ID to make point table
:param str table_line: Table name suffix to append to focus area ID to make line table
:param str table_poly: Table name suffix to append to focus area ID to make polygon table
:param str table_admin: Table name suffix to append to focus area ID to make admin region table
:param int timeout_statement: number of seconds to allow each SQL statement
:param int timeout_overall: number of seconds total to allow each SQL statement (including retries)
:param bool delete_contents: if True the contents of any existing tables will be deleted
:param logging.Logger logger: logger object (optional)
"""
# check args without defaults
if not isinstance( focus_area, dict ) :
raise Exception( 'invalid focus_area' )
if not isinstance( database_handle, soton_corenlppy.PostgresqlHandler.PostgresqlHandler ) :
raise Exception( 'invalid database_handle' )
if (not isinstance( schema, str )) and (not isinstance( schema, str)) :
raise Exception( 'invalid schema' )
# make database tables for preprocessing work
# note: psycopg2 inserts '' around all strings regardless of where they appear in SQL statement.
# schema and table names cannot have quotes in PostgerSQL. thus we add schema and tables manually into teh SQL string not using parameterization
listStatementsTables = []
listStatementsDeletes = []
if 'focus_area_id' in focus_area :
strFocusAreaID = focus_area['focus_area_id']
else :
raise Exception( 'focus_area_id key not found in jsonFocusArea dict' )
strTableSQL = """
CREATE SCHEMA IF NOT EXISTS {:s};
""".format( schema )
listStatementsTables.append( (strTableSQL,None) )
strTableSQL = """
CREATE TABLE IF NOT EXISTS {:s}.{:s} (
loc_id serial,
name text,
osm_id_set bigint[],
admin_regions bigint[],
geom geometry(Geometry,4326),
tags hstore,
CONSTRAINT {:s}_pkey PRIMARY KEY (loc_id)
)
WITH (
OIDS=FALSE
);
""".format( schema,strFocusAreaID + '_' + table_point,strFocusAreaID + '_' + table_point )
# note: we use a SQL function to trap exception in case GIST index already exists as postgres 9.3 does not allow NOT EXISTS on CREATE INDEX
# see http://dba.stackexchange.com/questions/35616/create-index-if-it-does-not-exist
strIndexSQL = """
CREATE OR REPLACE FUNCTION create_index() RETURNS void AS $$
BEGIN
CREATE INDEX {:s} ON {:s}.{:s} USING GIST ( geom );
EXCEPTION
WHEN undefined_table THEN
RETURN;
WHEN duplicate_table THEN
RETURN;
END;
$$ LANGUAGE plpgsql;
SELECT create_index();
""".format( strFocusAreaID + '_' + table_point + '_gist_index',schema,strFocusAreaID + '_' + table_point )
strSQLDelete = """
DELETE FROM {:s}.{:s};
""".format( schema,strFocusAreaID + '_' + table_point )
listStatementsTables.append( (strTableSQL,None) )
listStatementsTables.append( (strIndexSQL,None) )
if delete_contents == True :
listStatementsDeletes.append( (strSQLDelete,None) )
strTableSQL = """
CREATE TABLE IF NOT EXISTS {:s}.{:s} (
loc_id serial,
name text,
osm_id_set bigint[],
admin_regions bigint[],
geom geometry(Geometry,4326),
tags hstore,
CONSTRAINT {:s}_pkey PRIMARY KEY (loc_id)
)
WITH (
OIDS=FALSE
);
""".format( schema,strFocusAreaID + '_' + table_line,strFocusAreaID + '_' + table_line )
strIndexSQL = """
CREATE OR REPLACE FUNCTION create_index() RETURNS void AS $$
BEGIN
CREATE INDEX {:s} ON {:s}.{:s} USING GIST ( geom );
EXCEPTION
WHEN undefined_table THEN
RETURN;
WHEN duplicate_table THEN
RETURN;
END;
$$ LANGUAGE plpgsql;
SELECT create_index();
""".format( strFocusAreaID + '_' + table_line + '_gist_index',schema,strFocusAreaID + '_' + table_line )
strSQLDelete = """
DELETE FROM {:s}.{:s};
""".format( schema,strFocusAreaID + '_' + table_line )
listStatementsTables.append( (strTableSQL,None) )
listStatementsTables.append( (strIndexSQL,None) )
if delete_contents == True :
listStatementsDeletes.append( (strSQLDelete,None) )
strTableSQL = """
CREATE TABLE IF NOT EXISTS {:s}.{:s} (
loc_id serial,
name text,
osm_id_set bigint[],
admin_regions bigint[],
geom geometry(Geometry,4326),
tags hstore,
CONSTRAINT {:s}_pkey PRIMARY KEY (loc_id)
)
WITH (
OIDS=FALSE
);
""".format( schema,strFocusAreaID + '_' + table_poly,strFocusAreaID + '_' + table_poly )
strIndexSQL = """
CREATE OR REPLACE FUNCTION create_index() RETURNS void AS $$
BEGIN
CREATE INDEX {:s} ON {:s}.{:s} USING GIST ( geom );
EXCEPTION
WHEN undefined_table THEN
RETURN;
WHEN duplicate_table THEN
RETURN;
END;
$$ LANGUAGE plpgsql;
SELECT create_index();
""".format( strFocusAreaID + '_' + table_poly + '_gist_index',schema,strFocusAreaID + '_' + table_poly )
strSQLDelete = """
DELETE FROM {:s}.{:s};
""".format( schema,strFocusAreaID + '_' + table_poly )
listStatementsTables.append( (strTableSQL,None) )
listStatementsTables.append( (strIndexSQL,None) )
if delete_contents == True :
listStatementsDeletes.append( (strSQLDelete,None) )
strTableSQL = """
CREATE TABLE IF NOT EXISTS {:s}.{:s} (
loc_id serial,
name text,
osm_id_set bigint[],
admin_regions bigint[],
geom geometry(Geometry,4326),
tags hstore,
CONSTRAINT {:s}_pkey PRIMARY KEY (loc_id)
)
WITH (
OIDS=FALSE
);
""".format( schema,strFocusAreaID + '_' + table_admin,strFocusAreaID + '_' + table_admin )
strIndexSQL = """
CREATE OR REPLACE FUNCTION create_index() RETURNS void AS $$
BEGIN
CREATE INDEX {:s} ON {:s}.{:s} USING GIST ( geom );
EXCEPTION
WHEN undefined_table THEN
RETURN;
WHEN duplicate_table THEN
RETURN;
END;
$$ LANGUAGE plpgsql;
SELECT create_index();
""".format( strFocusAreaID + '_' + table_admin + '_gist_index',schema,strFocusAreaID + '_' + table_admin )
strSQLDelete = """
DELETE FROM {:s}.{:s};
""".format( schema,strFocusAreaID + '_' + table_admin )
listStatementsTables.append( (strTableSQL,None) )
listStatementsTables.append( (strIndexSQL,None) )
if delete_contents == True :
listStatementsDeletes.append( (strSQLDelete,None) )
# note: Postgres does NOT provide concurrency support for CREATE TABLE so concurrent attempts WILL result
# in an SQL error for the 2nd attempt (actually on unique KEY validation check)
# see: http://www.postgresql.org/message-id/CA+TgmoZAdYVtwBfp1FL2sMZbiHCWT4UPrzRLNnX1Nb30Ku3-gg@mail.gmail.com
# solution: trap errors and ignore :(
# execute create table statements (using first connection as this is not multi-threaded)
try :
# reconnect as there seems to be a thread issue with the connection object (maybe connection object is not thread safe?)
database_handle.reconnect()
# execute SQL
database_handle.execute_sql_statement( listStatementsTables, timeout_statement, timeout_overall )
except :
if logger != None :
logger.warning( 'SQL error in create_preprocessing_tables() : this is probably NOT an error if concurrent attempts to make table failed : ' + repr(sys.exc_info()[0]) + ' : ' + repr(sys.exc_info()[1]) )
if delete_contents == True :
database_handle.execute_sql_statement( listStatementsDeletes, timeout_statement, timeout_overall )
[docs]def execute_preprocessing_focus_area( focus_area, database_pool, schema, table_point = 'point', table_line = 'line', table_poly = 'poly', table_admin = 'admin', timeout_statement = 1209600, timeout_overall = 1209600, logger = None ) :
"""
Populates preprocessing tables with locations for a new focus area. If the area has already been precomputed it will immediately return with the location ID range. Small areas (e.g. town) take a few minutes to compute. Large areas (e.g. city) take up to 1 hour to compute.
The database tables will be SQL locked whilst the data import is occuring to ensure new locations are allocated continuous primary key IDs,
allowing a simple result range (start ID and end ID) to be returned as opposed to a long list of IDs for each new location.
This funciton is process safe but not thread safe as it uses an internal Queue() object to coordinate multiple database cursors and efficiently insert data using parallel SQL inserts.
The global area (referenced using the focus area key 'admin_lookup_table') must be already created and available in the same schema as this new focus area.
If the table already exists and already has locations then no import wil occur (assuming area has already been preprocessed).
:param dict focus_area: focus area description
:param dict database_pool: pool of PostgresqlHandler.PostgresqlHandler objects for each of the 4 table types used to execute parallel SQL imports e.g. { 'admin' : PostgresqlHandler.PostgresqlHandler, ... }
:param str schema: Postgres schema name under which tables will be created
:param str table_point: Table name suffix to append to focus area ID to make point table
:param str table_line: Table name suffix to append to focus area ID to make line table
:param str table_poly: Table name suffix to append to focus area ID to make polygon table
:param str table_admin: Table name suffix to append to focus area ID to make admin region table
:param int timeout_statement: number of seconds to allow each SQL statement
:param int timeout_overall: number of seconds total to allow each SQL statement (including retries)
:param logging.Logger logger: logger object (optional)
:return: for each table a tuple with the new location primary key ID range e.g. { 'focus1_admin' : (nLocIDStart, nLocIDEnd), ... }
:rtype: dict
"""
# check args without defaults
if not isinstance( focus_area, dict ) :
raise Exception( 'invalid focus_area' )
if not isinstance( database_pool, dict ) :
raise Exception( 'invalid database_pool' )
if (not isinstance( schema, str )) and (not isinstance( schema, str)) :
raise Exception( 'invalid schema' )
if logger != None :
logger.info( 'starting preprocessing of new focus area : ' + repr(focus_area) )
if not isinstance( focus_area, dict ) :
raise Exception( 'focus_area not a dict' )
if 'focus_area_id' in focus_area :
strFocusAreaID = focus_area['focus_area_id']
else :
raise Exception( 'focus_area_id key not found in focus_area dict' )
# tips on OpenGIS SQL
# ST_Collect is faster than ST_Union
# ST_Intersects is faster than ST_Contains
# operator && uses a bounding box and is the fastest of all (but will not match true geometry so use as a pre-filter)
# but read operation docs to understand exactly what each operator is doing!
dictNewLocations = {}
if 'admin_lookup_table' in focus_area :
strAdminLookupTable = focus_area['admin_lookup_table']
else :
strAdminLookupTable = None
tuplePlaceTypes = None
# parse query and construct the SQL to use for it
# different types of focus area request are supported and they translate to different SQL queries
listSQLInserts = []
# check focus area one of the required entries
if (not 'radius' in focus_area) and (not 'geom' in focus_area) and (not 'admin' in focus_area) and (not 'osmid' in focus_area) and (not 'place_types' in focus_area) :
raise Exception( 'unknown focus area type' )
#
# FOCUS AREA = RADIUS, GEOM, ADMIN, OSMID
#
if ('radius' in focus_area) or ('geom' in focus_area) or ('admin' in focus_area) or ('osmid' in focus_area) :
# get filters common to these three focus area types
strAdminName = None
listAdminParents = None
#
# make SQL to get the geometry area from which to find locations to preprocess
# geom => explicitly defined geom SQL
# radius => explicitly defined point and radius
# admin => name, admin, ... => geom lookup SQL
# osmid => type, osmid => geom lookup SQL
#
if 'geom' in focus_area :
strGeom = focus_area['geom']
strGeomDefinedSQL = """
SELECT ST_GeomFromText( '{:s}',4326 ) AS geom
""".format( strGeom )
elif 'radius' in focus_area :
listRadiusEntity = focus_area['radius']
if len( listRadiusEntity ) != 2 :
raise Exception( 'radius entry is not a list [geom_str,dist_float] (check JSON focus area is correct)' )
strGeom = listRadiusEntity[0]
nDistance = float( listRadiusEntity[1] )
# note: ST_Dwithin() is much quicker than defining a geom then looking for overlaps
# BUT this approach allows commonality with the other ways to define a geom
# note: Units of radius are measured in units of the spatial reference system (i.e. degrees)
strGeomDefinedSQL = """
SELECT ST_Buffer( ST_GeomFromText( '{:s}',4326 ), {:.12g} ) AS geom
""".format( strGeom, nDistance )
elif 'admin' in focus_area :
listAdminNames = focus_area['admin']
if len(listAdminNames) < 1 :
raise Exception('admin focus area does not have any names to find geom (check JSON focus area is correct)')
if len(listAdminNames) == 1 :
strAdminName = listAdminNames[0].lower()
tupleAdminParents = None
# first match returned only as there will be many matches of any given location name
# this is a poor query!
strGeomDefinedSQL = """
SELECT
possible_match.geom AS geom
FROM
(
SELECT geom
FROM {:s}.{:s}
WHERE lower(name) = %s
) AS possible_match
LIMIT 1
""".format( schema, strAdminLookupTable )
else :
# note: list not tuple as we want an ARRAY['aaa','bbb' ...] not a tuple ('aaa','bbb'...) in the final SQL statement
strAdminName = listAdminNames[0].lower()
tupleAdminParents = tuple( [x.lower() for x in listAdminNames[1:]] )
# use the global cities table to do an efficient lookup (otherwise it will take a very long time to match admin regions to list)
strGeomDefinedSQL = """
SELECT
matches.geom AS geom
FROM (
SELECT
possible_match.osm_id_set AS osm_id_set,
possible_match.geom AS geom,
array_agg( lower({:s}.{:s}.name) ) AS admin_names
FROM {:s}.{:s},
(
SELECT
geom,
osm_id_set,
admin_regions
FROM {:s}.{:s}
WHERE lower(name) = %s
) AS possible_match
WHERE
{:s}.{:s}.osm_id_set != possible_match.osm_id_set
AND lower({:s}.{:s}.name) IN %s
AND {:s}.{:s}.osm_id_set && possible_match.admin_regions
GROUP BY possible_match.osm_id_set, possible_match.geom
) AS matches
ORDER BY array_length(matches.admin_names,1) DESC
LIMIT 1
""".format( schema, strAdminLookupTable, schema, strAdminLookupTable, schema, strAdminLookupTable, schema, strAdminLookupTable, schema, strAdminLookupTable, schema, strAdminLookupTable )
elif 'osmid' in focus_area :
strGeomDefinedSQL = """
SELECT way AS geom FROM {:s} WHERE
""".format( 'planet_osm_polygon' )
listOSMEntity = focus_area['osmid']
nIndexOSM = 0
while nIndexOSM < len( listOSMEntity ) :
if nIndexOSM + 1 >= len( listOSMEntity ) :
raise Exception( 'osmid list is of incorrect format (check JSON focus area is correct)' )
# note: only relations make sense to get a closed geometry object as these
# are entered into the planet_osm_polygon table with a negative id
strOSMType = listOSMEntity[nIndexOSM]
nOSMID = int( listOSMEntity[nIndexOSM+1] )
if strOSMType != 'relation' :
raise Exception( 'unknown focus area OSMID type (expected relation) got (' + strOSMType + ')' )
if nIndexOSM == 0 :
strGeomDefinedSQL = strGeomDefinedSQL + ' osm_id = {:d} '.format( nOSMID * -1 )
else :
strGeomDefinedSQL = strGeomDefinedSQL + ' OR osm_id = {:d} '.format( nOSMID * -1 )
nIndexOSM = nIndexOSM + 2
# prepare tuple for geom WITH data (other SQL will not need data so we can do this here)
listTuple = []
if strAdminName != None :
if tupleAdminParents != None :
listTuple = [strAdminName, tupleAdminParents]
else :
listTuple = [strAdminName]
tupleSQLParams = tuple( listTuple )
#
# ADMIN
#
# get the current size of the table.
# if this is not 0 then we simply do no preprocessing and return the min and max primary key ID values
# because the focus area has already been preprocessed
strSQL = """SELECT MIN(loc_id), MAX(loc_id) FROM {:s}.{:s}""".format( schema,strFocusAreaID + '_' + table_admin )
listCount = database_pool[table_admin].execute_sql_query( (strSQL,None), timeout_statement, timeout_overall )
if len(listCount) != 1 :
raise Exception( 'admin table row count failed' )
# result = [(nMin,nMax)] or [(None,None)] if no rows present
nRowStart = listCount[0][0]
nRowEnd = listCount[0][1]
bTableEmpty = False
if (nRowStart == None) or (nRowEnd == None) :
bTableEmpty = True
if bTableEmpty == True :
#
# admin regions (polygons and lines closed and made into polygons) within the specified geom
# note: we reuse the precomputed global admin levels to ensure (a) its quick (b) lines and polygons are used
# note: we do not consider points (e.g. places like suburbs) as they do not have geom information useful to calculate subregions which is the purpose of admin area entries
# note: we deliberately do NOT check for existing locations (and update them) as we are expected to
# run in parallel across multiple databases so will handle duplicates at the geoparse aggregation stage
# this allows global database (no parent admin regions) to be made quickly then extra (duplicate) locations added
# with parent admin regions filled in later on
#
strSQL = """
WITH
geom_defined AS (
{:s}
),
admin_all AS (
SELECT
name,
osm_id_set,
admin_table.geom AS geom,
admin_regions,
tags
FROM {:s}.{:s} AS admin_table, geom_defined
WHERE
ST_IsValid( admin_table.geom )
AND ST_IsValid( geom_defined.geom )
AND ST_Envelope( admin_table.geom ) && ST_Envelope( geom_defined.geom )
AND ST_Intersects( admin_table.geom, geom_defined.geom )
),
target_all AS (
SELECT
name,
osm_id_set,
admin_table.geom,
admin_regions,
tags
FROM {:s}.{:s} AS admin_table, geom_defined
WHERE
ST_IsValid( admin_table.geom )
AND ST_IsValid( geom_defined.geom )
AND ST_Envelope( admin_table.geom ) && ST_Envelope( geom_defined.geom )
AND ST_Intersects( admin_table.geom, geom_defined.geom )
),
regions_to_add AS (
SELECT
target_with_super_region.name AS name,
target_with_super_region.osm_id_set AS osm_id_set,
array_agg( target_with_super_region.super_region ) AS admin_regions,
target_with_super_region.union_way AS geom,
target_with_super_region.tags AS tags
FROM
(
/* super region = any admin polygon that contains target polygon */
SELECT
target_all.name AS name,
target_all.osm_id_set AS osm_id_set,
unnest(admin_all.osm_id_set) AS super_region,
target_all.geom AS union_way,
target_all.tags AS tags
FROM target_all, admin_all
WHERE
ST_Envelope( admin_all.geom ) && ST_Envelope( target_all.geom )
AND ST_Contains( admin_all.geom, target_all.geom )
UNION
/* super region = target id set so that all targets appears even if they have no super regions and done match the previous query (e.g. UK) */
SELECT
target_all.name AS name,
target_all.osm_id_set AS osm_id_set,
unnest( target_all.osm_id_set ) AS super_region,
target_all.geom AS union_way,
target_all.tags AS tags
FROM target_all
) AS target_with_super_region
GROUP BY target_with_super_region.name, target_with_super_region.osm_id_set, target_with_super_region.union_way, target_with_super_region.tags
)
INSERT INTO {:s}.{:s} ( name,osm_id_set,admin_regions,geom,tags )
SELECT
regions_to_add.name,
regions_to_add.osm_id_set,
regions_to_add.admin_regions,
regions_to_add.geom,
regions_to_add.tags
FROM regions_to_add
RETURNING loc_id;
""".format( strGeomDefinedSQL, schema, strAdminLookupTable, schema, strAdminLookupTable, schema, strFocusAreaID + '_' + table_admin )
listSQLInserts.append( (table_admin,strSQL,tupleSQLParams,logger,database_pool[table_admin],timeout_statement, timeout_overall) )
else :
# admin table aready exists so no SQL needed
# but we still need to report the existing locations
# as if they were just populated so subsequent code (e.g. geoparse) can process them
dictNewLocations[ strFocusAreaID + '_' + table_admin ] = (nRowStart, nRowEnd)
#
# POLY : polygons NOT admin regions
#
# get the current size of the table.
# if this is not 0 then we simply do no preprocessing and return the min and max primary key ID values
# because the focus area has already been preprocessed
strSQL = """SELECT MIN(loc_id), MAX(loc_id) FROM {:s}.{:s}""".format( schema,strFocusAreaID + '_' + table_poly )
listCount = database_pool[table_poly].execute_sql_query( (strSQL,None), timeout_statement, timeout_overall )
if len(listCount) != 1 :
raise Exception( 'poly table row count failed' )
# result = [(nMin,nMax)] or [(None,None)] if no rows present
nRowStart = listCount[0][0]
nRowEnd = listCount[0][1]
bTableEmpty = False
if (nRowStart == None) or (nRowEnd == None) :
bTableEmpty = True
if bTableEmpty == True :
strSQL = """
WITH
geom_defined AS (
{:s}
),
admin_all AS (
SELECT
name,
osm_id_set,
admin_table.geom AS geom,
admin_regions,
tags
FROM {:s}.{:s} AS admin_table, geom_defined
WHERE
ST_IsValid( admin_table.geom )
AND ST_IsValid( geom_defined.geom )
AND ST_Envelope( admin_table.geom ) && ST_Envelope( geom_defined.geom )
AND ST_Intersects( admin_table.geom, geom_defined.geom )
)
INSERT INTO {:s}.{:s} ( name,osm_id_set,admin_regions,geom,tags )
SELECT
expanded_admin.name AS name,
expanded_admin.osm_id_set AS osm_id_set,
array_agg( expanded_admin.admin_regions ) AS admin_regions,
expanded_admin.geom AS geom,
expanded_admin.tags AS tags
FROM
(
/* polygon entry with a single way id */
SELECT
name_poly AS name,
unique_poly.osm_id_set AS osm_id_set,
unnest( admin_all.osm_id_set ) AS admin_regions,
unique_poly.union_way AS geom,
hstore( string_to_array( way_tags_serialized, '###' ) ) AS tags
FROM geom_defined,admin_all,
(
SELECT name AS name_poly,
array_agg( osm_id ) AS osm_id_set,
ST_Buffer( ST_Collect( array_agg( planet_osm_polygon.way ) ), 0 ) AS union_way,
string_agg( array_to_string( planet_osm_ways.tags,'###' ),'###' ) AS way_tags_serialized
FROM planet_osm_polygon, planet_osm_ways, geom_defined
WHERE
name IS NOT NULL
AND planet_osm_polygon.boundary IS NULL
AND planet_osm_polygon.place IS NULL
AND ST_IsValid( planet_osm_polygon.way )
AND ST_IsValid( geom_defined.geom )
AND ST_Envelope( planet_osm_polygon.way ) && ST_Envelope( geom_defined.geom )
AND ST_Intersects( planet_osm_polygon.way, geom_defined.geom )
AND planet_osm_ways.id = osm_id
GROUP BY name
) AS unique_poly
WHERE
ST_Envelope(admin_all.geom) && ST_Envelope(unique_poly.union_way)
AND ST_Contains( admin_all.geom, unique_poly.union_way )
UNION
/* polygon with a rel entry, such as relations with multipolygons where several ways id's represent different parts of the relation shape */
SELECT
name_poly AS name,
unique_poly.osm_id_set AS osm_id_set,
unnest( admin_all.osm_id_set ) AS admin_regions,
unique_poly.union_way AS geom,
hstore( string_to_array( way_tags_serialized, '###' ) ) AS tags
FROM geom_defined,admin_all,
(
SELECT
rel_data.name_poly AS name_poly,
rel_data.osm_id_set AS osm_id_set,
rel_data.union_way AS union_way,
string_agg( array_to_string( planet_osm_ways.tags,'###' ),'###' ) AS way_tags_serialized
FROM planet_osm_ways,
(
SELECT name AS name_poly,
array_agg( osm_id ) AS osm_id_set,
ST_Buffer( ST_Collect( array_agg( planet_osm_polygon.way ) ), 0 ) AS union_way,
planet_osm_rels.parts AS rel_way_ids
FROM planet_osm_polygon, planet_osm_rels, geom_defined
WHERE
name IS NOT NULL
AND planet_osm_polygon.boundary IS NULL
AND planet_osm_polygon.place IS NULL
AND ST_IsValid( planet_osm_polygon.way )
AND ST_IsValid( geom_defined.geom )
AND ST_Envelope( planet_osm_polygon.way ) && ST_Envelope( geom_defined.geom )
AND ST_Intersects( planet_osm_polygon.way, geom_defined.geom )
AND planet_osm_rels.id = -1 * osm_id
GROUP BY name, planet_osm_rels.parts
) AS rel_data
WHERE
planet_osm_ways.id = ANY (rel_data.rel_way_ids)
AND planet_osm_ways.tags IS NOT NULL
GROUP BY
rel_data.name_poly, rel_data.osm_id_set, rel_data.union_way, planet_osm_ways.tags
) AS unique_poly
WHERE
ST_Envelope(admin_all.geom) && ST_Envelope(unique_poly.union_way)
AND ST_Contains( admin_all.geom, unique_poly.union_way )
) AS expanded_admin
GROUP BY expanded_admin.name, expanded_admin.osm_id_set, expanded_admin.geom, expanded_admin.tags
RETURNING loc_id;
""".format( strGeomDefinedSQL, schema, strAdminLookupTable, schema, strFocusAreaID + '_' + table_poly )
listSQLInserts.append( (table_poly,strSQL,tupleSQLParams,logger,database_pool[table_poly],timeout_statement, timeout_overall) )
else :
# poly table aready exists so no SQL needed
# but we still need to report the existing locations
# as if they were just populated so subsequent code (e.g. geoparse) can process them
dictNewLocations[ strFocusAreaID + '_' + table_poly ] = (nRowStart, nRowEnd)
#
# LINES
#
# get the current size of the table.
# if this is not 0 then we simply do no preprocessing and return the min and max primary key ID values
# because the focus area has already been preprocessed
strSQL = """SELECT MIN(loc_id), MAX(loc_id) FROM {:s}.{:s}""".format( schema,strFocusAreaID + '_' + table_line )
listCount = database_pool[table_line].execute_sql_query( (strSQL,None), timeout_statement, timeout_overall )
if len(listCount) != 1 :
raise Exception( 'line table row count failed' )
# result = [(nMin,nMax)] or [(None,None)] if no rows present
nRowStart = listCount[0][0]
nRowEnd = listCount[0][1]
bTableEmpty = False
if (nRowStart == None) or (nRowEnd == None) :
bTableEmpty = True
if bTableEmpty == True :
strSQL = """
WITH
geom_defined AS (
{:s}
),
admin_all AS (
SELECT
name,
osm_id_set,
admin_table.geom AS geom,
admin_regions,
tags
FROM {:s}.{:s} AS admin_table, geom_defined
WHERE
ST_IsValid( admin_table.geom )
AND ST_IsValid( geom_defined.geom )
AND ST_Envelope( admin_table.geom ) && ST_Envelope( geom_defined.geom )
AND ST_Intersects( admin_table.geom, geom_defined.geom )
)
INSERT INTO {:s}.{:s} ( name,osm_id_set,admin_regions,geom,tags )
SELECT
expanded_admin.name AS name,
expanded_admin.osm_id_set AS osm_id_set,
array_agg( expanded_admin.admin_regions ) AS admin_regions,
expanded_admin.geom AS geom,
expanded_admin.tags AS tags
FROM
(
SELECT
name_line AS name,
unique_line.osm_id_set AS osm_id_set,
unnest( admin_all.osm_id_set ) AS admin_regions,
unique_line.union_way AS geom,
hstore( string_to_array( way_tags_serialized, '###' ) ) AS tags
FROM admin_all,geom_defined,
(
SELECT name AS name_line,
array_agg( osm_id ) AS osm_id_set,
ST_Collect( array_agg( way ) ) AS union_way,
string_agg( array_to_string( planet_osm_ways.tags,'###' ),'###' ) AS way_tags_serialized
FROM planet_osm_line, planet_osm_ways, geom_defined
WHERE
name IS NOT NULL
AND planet_osm_ways.id = osm_id
AND ST_IsValid( planet_osm_line.way )
AND ST_IsValid( geom_defined.geom )
AND ST_Envelope( planet_osm_line.way ) && ST_Envelope( geom_defined.geom )
AND ST_Intersects( planet_osm_line.way, geom_defined.geom )
GROUP BY name
) AS unique_line
WHERE
ST_Envelope(admin_all.geom) && ST_Envelope( unique_line.union_way )
AND ST_Contains( admin_all.geom, unique_line.union_way )
) AS expanded_admin
GROUP BY expanded_admin.name, expanded_admin.osm_id_set, expanded_admin.geom, expanded_admin.tags
RETURNING loc_id;
""".format( strGeomDefinedSQL, schema, strAdminLookupTable, schema, strFocusAreaID + '_' + table_line )
listSQLInserts.append( (table_line,strSQL,tupleSQLParams,logger,database_pool[table_line],timeout_statement, timeout_overall) )
else :
# line table aready exists so no SQL needed
# but we still need to report the existing locations
# as if they were just populated so subsequent code (e.g. geoparse) can process them
dictNewLocations[ strFocusAreaID + '_' + table_line ] = (nRowStart, nRowEnd)
#
# POINT: points NOT admin regions
#
# get the current size of the table.
# if this is not 0 then we simply do no preprocessing and return the min and max primary key ID values
# because the focus area has already been preprocessed
strSQL = """SELECT MIN(loc_id), MAX(loc_id) FROM {:s}.{:s}""".format( schema,strFocusAreaID + '_' + table_point )
listCount = database_pool[table_point].execute_sql_query( (strSQL,None), timeout_statement, timeout_overall )
if len(listCount) != 1 :
raise Exception( 'point table row count failed' )
# result = [(nMin,nMax)] or [(None,None)] if no rows present
nRowStart = listCount[0][0]
nRowEnd = listCount[0][1]
bTableEmpty = False
if (nRowStart == None) or (nRowEnd == None) :
bTableEmpty = True
if bTableEmpty == True :
strSQL = """
WITH
geom_defined AS (
{:s}
),
admin_all AS (
SELECT
name,
osm_id_set,
admin_table.geom AS geom,
admin_regions,
tags
FROM {:s}.{:s} AS admin_table, geom_defined
WHERE
ST_IsValid( admin_table.geom )
AND ST_IsValid( geom_defined.geom )
AND ST_Envelope( admin_table.geom ) && ST_Envelope( geom_defined.geom )
AND ST_Intersects( admin_table.geom, geom_defined.geom )
)
INSERT INTO {:s}.{:s} ( name,osm_id_set,admin_regions,geom,tags )
SELECT
expanded_admin.name AS name,
expanded_admin.osm_id_set AS osm_id_set,
array_agg( expanded_admin.admin_regions ) AS admin_regions,
expanded_admin.geom AS geom,
expanded_admin.tags AS tags
FROM
(
SELECT
name_point AS name,
array[ unique_node.osm_id ] AS osm_id_set,
unnest( admin_all.osm_id_set ) AS admin_regions,
unique_node.way AS geom,
hstore( point_tags ) AS tags
FROM admin_all,geom_defined,
(
SELECT name AS name_point,
osm_id,
way,
tags AS point_tags
FROM public.planet_osm_point, geom_defined
WHERE
name IS NOT NULL
AND planet_osm_point.boundary IS NULL
AND planet_osm_point.place IS NULL
AND ST_IsValid( planet_osm_point.way )
AND ST_IsValid( geom_defined.geom )
AND ST_Envelope( planet_osm_point.way ) && ST_Envelope( geom_defined.geom )
AND ST_Intersects( planet_osm_point.way, geom_defined.geom )
GROUP BY name, osm_id, way, point_tags
) AS unique_node
WHERE
ST_Envelope( admin_all.geom ) && ST_Envelope( unique_node.way )
AND ST_Contains( admin_all.geom, unique_node.way )
) AS expanded_admin
GROUP BY expanded_admin.name, expanded_admin.osm_id_set, expanded_admin.geom, expanded_admin.tags
RETURNING loc_id;
""".format( strGeomDefinedSQL, schema, strAdminLookupTable, schema, strFocusAreaID + '_' + table_point )
listSQLInserts.append( (table_point,strSQL,tupleSQLParams,logger,database_pool[table_point], timeout_statement, timeout_overall) )
else :
# line table aready exists so no SQL needed
# but we still need to report the existing locations
# as if they were just populated so subsequent code (e.g. geoparse) can process them
dictNewLocations[ strFocusAreaID + '_' + table_point ] = (nRowStart, nRowEnd)
#
# FOCUS FILTER = NODES WITH PLACE TYPES
#
if 'place_types' in focus_area :
# get the current size of the table (typically 1,000,000 nodes match the usual place types below city).
# if this is not 0 then we simply do no preprocessing and return the min and max primary key ID values
# because the focus area has already been preprocessed
strSQL = """SELECT MIN(loc_id), MAX(loc_id) FROM {:s}.{:s}""".format( schema,strFocusAreaID + '_' + table_point )
listCount = database_pool[table_point].execute_sql_query( (strSQL,None), timeout_statement, timeout_overall )
if len(listCount) != 1 :
raise Exception( 'places (point) table row count failed' )
# result = [(nMin,nMax)] or [(None,None)] if no rows present
nRowStart = listCount[0][0]
nRowEnd = listCount[0][1]
bTableEmpty = False
if (nRowStart == None) or (nRowEnd == None) :
bTableEmpty = True
if bTableEmpty == True :
# get super regions if any otherwise assume its a global places request
if 'parent_osmid' in focus_area :
# compute each parent_osmid separately as the admin lookup is sqr(n) calculation since admin areas are cross-checked vs places. the more admin areas and places, and sqr(n) more cross checks to perform
nIndexOSM = 0
while nIndexOSM < len( focus_area['parent_osmid'] ) :
if nIndexOSM + 1 >= len( focus_area['parent_osmid'] ) :
raise Exception( 'parent osmid list is of incorrect format (check JSON focus area is correct)' )
listIDs = []
# note: only relations make sense to get a closed geometry object as these
# are entered into the planet_osm_polygon table with a negative id
strOSMType = focus_area['parent_osmid'][nIndexOSM]
nOSMID = int( focus_area['parent_osmid'][nIndexOSM+1] )
if strOSMType != 'relation' :
raise Exception( 'unknown focus area parent OSMID type (expected relation) got (' + strOSMType + ')' )
listIDs.append( nOSMID * -1 )
nIndexOSM = nIndexOSM + 2
# check 'place' is in the tags array then check one of the place types is also present
# OSM uses text[] not hstore() so we need to so array overlap && operator not key and value check
# note: pass a LIST not a tuple for listIDs so it is converted to a ARRAY not a RECORD
tupleSQLParams = tuple( [ listIDs, listIDs, tuple(focus_area['place_types']) ] )
strSQL = """
WITH
admin_all AS (
SELECT
name,
osm_id_set,
geom,
admin_regions,
tags
FROM {:s}.{:s} AS table_admin
WHERE
table_admin.admin_regions && %s::bigint[]
AND ST_IsValid( table_admin.geom )
),
target_all AS (
SELECT
admin_all.geom
FROM admin_all
WHERE
admin_all.osm_id_set && %s::bigint[]
)
INSERT INTO {:s}.{:s} ( name,osm_id_set,admin_regions,geom,tags )
SELECT
expanded_admin.name AS name,
expanded_admin.osm_id_set AS osm_id_set,
array_agg( expanded_admin.admin_regions ) AS admin_regions,
expanded_admin.geom AS geom,
expanded_admin.tags AS tags
FROM
(
SELECT
unique_node.name AS name,
array[ unique_node.osm_id ] AS osm_id_set,
unnest( admin_all.osm_id_set ) AS admin_regions,
unique_node.way AS geom,
hstore( unique_node.tags ) AS tags
FROM admin_all,
(
SELECT
planet_osm_point.name AS name,
planet_osm_point.osm_id AS osm_id,
planet_osm_point.way AS way,
planet_osm_point.tags AS tags
FROM planet_osm_point, target_all
WHERE
planet_osm_point.name IS NOT NULL
AND planet_osm_point.place IN %s
AND ST_IsValid( planet_osm_point.way )
AND ST_Envelope( target_all.geom ) && planet_osm_point.way
AND ST_Contains( target_all.geom, planet_osm_point.way )
) AS unique_node
WHERE
ST_Envelope( admin_all.geom ) && ST_Envelope( unique_node.way )
AND ST_Contains( admin_all.geom, unique_node.way )
) AS expanded_admin
GROUP BY expanded_admin.name, expanded_admin.osm_id_set, expanded_admin.geom, expanded_admin.tags
RETURNING loc_id;
""".format( schema, strAdminLookupTable, schema, strFocusAreaID + '_' + table_point )
# add SQL to list
listSQLInserts.append( (table_point,strSQL,tupleSQLParams,logger,database_pool[table_point], timeout_statement, timeout_overall) )
else :
# check first 'place' is in the tags array then check one of the place types is also present
# OSM uses text[] not hstore() so we need to so array overlap && operator not key and value check
# note: this takes too long to test but in the future machines ight be quick enough to run it in under 2 weeks!
tupleSQLParams = tuple( [ tuple(focus_area['place_types']) ] )
strSQL = """
WITH
admin_all AS (
SELECT
name,
osm_id_set,
geom,
admin_regions,
tags
FROM {:s}.{:s} AS table_admin
WHERE
ST_IsValid( table_admin.geom )
)
INSERT INTO {:s}.{:s} ( name,osm_id_set,admin_regions,geom,tags )
SELECT
expanded_admin.name AS name,
expanded_admin.osm_id_set AS osm_id_set,
array_agg( expanded_admin.admin_regions ) AS admin_regions,
expanded_admin.geom AS geom,
expanded_admin.tags AS tags
FROM
(
SELECT
unique_node.name AS name,
array[ unique_node.osm_id ] AS osm_id_set,
unnest( admin_all.osm_id_set ) AS admin_regions,
unique_node.way AS geom,
hstore( unique_node.tags ) AS tags
FROM admin_all,
(
SELECT
planet_osm_point.name AS name,
planet_osm_point.osm_id AS osm_id,
planet_osm_point.way AS way,
planet_osm_point.tags AS tags
FROM planet_osm_point
WHERE
name IS NOT NULL
AND planet_osm_point.place IN %s
AND ST_IsValid( planet_osm_point.way )
) AS unique_node
WHERE
ST_Envelope( admin_all.geom ) && ST_Envelope( unique_node.way )
AND ST_Contains( admin_all.geom, unique_node.way )
) AS expanded_admin
GROUP BY expanded_admin.name, expanded_admin.osm_id_set, expanded_admin.geom, expanded_admin.tags
RETURNING loc_id;
""".format( schema, strAdminLookupTable, schema, strFocusAreaID + '_' + table_point )
# add SQL to list
listSQLInserts.append( (table_point,strSQL,tupleSQLParams,logger,database_pool[table_point], timeout_statement, timeout_overall) )
else :
# line table aready exists so no SQL needed
# but we still need to report the existing locations
# as if they were just populated so subsequent code (e.g. geoparse) can process them
dictNewLocations[ strFocusAreaID + '_' + table_point ] = (nRowStart, nRowEnd)
# execute SQL inserts as query as we want results
# use parallel threads to execute SQL simultaneously for increased performance
# with a thread safe queue per table to track location ID results at end
# result = { db_table : (loc_start, loc_end) ... }
queueSQLInsertResults = { table_point : queue.Queue(), table_line : queue.Queue(), table_poly : queue.Queue(), table_admin : queue.Queue() }
if logger != None :
logger.info( 'starting SQL threads' )
listThreads = []
for strTableName in [ table_point, table_line, table_poly, table_admin ] :
# one thread per table type
# to ensure we sequentially add entriies to individual tables and therefore get back sequential ranges of p_id's
# which we will report start/end later
# note: non-sequential p_id entries will mean start/end will capture locations NOT part of a focus area (e.g. parallel inserts into admin table interleaving p_id's)
listTupleSQLInsert = []
for tupleSQLInsert in listSQLInserts :
if tupleSQLInsert[0] == strTableName :
listTupleSQLInsert.append( tupleSQLInsert )
if len(listTupleSQLInsert) > 0 :
# add an initial EXCLUSIVE table lock which only allows ACCESS SHARE (i.e. read only) to happen in parallel during table inserts
# so we are process and thread safe for multiple table inserts (e.g. admin poly + admin point)
# this is really important as we will return a start + end loc id and therefore assume we get a continuous loc id result set
# note: lock duration is the transaction so will be released once the commit() occurs (there is no unlock in Postgres as such)
# http://www.postgresql.org/docs/9.1/static/sql-lock.html
strLock = """LOCK {:s}.{:s} IN EXCLUSIVE MODE;""".format( schema, strFocusAreaID + '_' + strTableName )
tupleLock = ( strTableName, strLock, None, logger, database_pool[strTableName], timeout_statement, timeout_overall )
listTupleSQLInsert.insert( 0,tupleLock )
threadBuffer = threading.Thread( target = thread_worker_sql_insert, args = (listTupleSQLInsert,queueSQLInsertResults,logger) )
threadBuffer.daemon = False
threadBuffer.start()
listThreads.append( threadBuffer )
if logger != None :
logger.info( 'waiting for joins' )
for threadEntry in listThreads :
threadEntry.join()
if logger != None :
logger.info( 'join successful' )
for strTableName in [ table_point, table_line, table_poly, table_admin ] :
if queueSQLInsertResults[ strTableName ].empty() == False :
nLocIDStart = -1
nLocIDEnd = -1
while queueSQLInsertResults[ strTableName ].empty() == False :
nLocID = queueSQLInsertResults[ strTableName ].get()
if (nLocIDStart == -1) or (nLocIDStart > nLocID) :
nLocIDStart = nLocID
if (nLocIDEnd == -1) or (nLocIDEnd < nLocID) :
nLocIDEnd = nLocID
dictNewLocations[ strFocusAreaID + '_' + strTableName ] = (nLocIDStart,nLocIDEnd)
if logger != None :
logger.debug( 'LOC IDs = ' + repr(dictNewLocations) )
return dictNewLocations
[docs]def execute_preprocessing_global( global_area, database_pool, schema, table_point = 'point', table_line = 'line', table_poly = 'poly', table_admin = 'admin', timeout_statement = 2000000, timeout_overall = 2000000, logger = None ) :
"""
Populates preprocessing tables with locations for a global area (up to admin level 6). This can take a very long time (about 3 days on a 2GHz CPU) so the default database timeout values are large (e.g. 1209600).
The database tables will be SQL locked whilst the data import is occuring to ensure new locations are allocated continuous primary key IDs,
allowing a simple result range (start ID and end ID) to be returned as opposed to a long list of IDs for each new location.
This funciton is process safe but not thread safe as it uses an internal Queue() object to coordinate multiple database cursors and efficiently insert data using parallel SQL inserts.
A global area must be created before individual focus areas can be created since it will contain for each admin region all its super regions, calculated based on PostGIS geometry.
If the table already exists and already has locations then no import will occur (assuming area has already been preprocessed).
:param dict global_area: global area description
:param dict database_pool: pool of PostgresqlHandler.PostgresqlHandler objects for each of the 4 table types used to execute parallel SQL imports e.g. { 'admin' : PostgresqlHandler.PostgresqlHandler, ... }
:param str schema: Postgres schema name under which tables will be created
:param str table_point: Table name suffix to append to focus area ID to make point table
:param str table_line: Table name suffix to append to focus area ID to make line table
:param str table_poly: Table name suffix to append to focus area ID to make polygon table
:param str table_admin: Table name suffix to append to focus area ID to make admin region table
:param int timeout_statement: number of seconds to allow each SQL statement
:param int timeout_overall: number of seconds total to allow each SQL statement (including retries)
:param logging.Logger logger: logger object (optional)
:return: for each table a tuple with the new location primary key ID range e.g. { 'global_admin' : (nLocIDStart, nLocIDEnd), ... }
:rtype: dict
"""
# check args without defaults
if not isinstance( global_area, dict ) :
raise Exception( 'invalid global_area' )
if not isinstance( database_pool, dict ) :
raise Exception( 'invalid database_pool' )
if (not isinstance( schema, str )) and (not isinstance( schema, str)) :
raise Exception( 'invalid schema' )
if logger != None :
logger.info( 'starting preprocessing of global admin : ' + repr(global_area) )
if not isinstance( global_area, dict ) :
raise Exception( 'focus_area not a dict' )
if 'focus_area_id' in global_area :
strFocusAreaID = global_area['focus_area_id']
else :
raise Exception( 'focus_area_id key not found in global_area' )
# tips on OpenGIS SQL
# ST_Collect is faster than ST_Union
# ST_Intersects is faster than ST_Contains
# operator && uses a bounding box and is the fastest of all (but will not match true geometry so use as a pre-filter)
# but read operation docs to understand exactly what each operator is doing!
dictNewLocations = {}
# admin down to 3 (about 1,400 locations)
# admin down to 4 (about 14,000 locations) including England, UK
# admin down to 6 (about 80,000 locations) including Southampton, England, UK
# admin down to 8 (about 300,000 locations) including Eastleigh, Hampshire, UK
# super-region admin level (lookup for super-region assignment)
nMaxAdminLevel = 4
# target admin regions (basic locations in global cities dataset)
nMaxAdminLevelTarget = 6
# make a list like ['1','2',...] for the SQL as admin levels are actually string values on OSM tables
listAdminLevels = list(range(1,nMaxAdminLevel+1))
for nIndex in range(len(listAdminLevels)) :
listAdminLevels[nIndex] = str( listAdminLevels[nIndex] )
# make a list like ['1','2',...] for the SQL as admin levels are actually string values on OSM tables
listAdminLevelsTarget = list(range(1,nMaxAdminLevelTarget+1))
for nIndex in range(len(listAdminLevelsTarget)) :
listAdminLevelsTarget[nIndex] = str( listAdminLevelsTarget[nIndex] )
# if we are asked to download admin regions then use a custom admin level range
tupleAdminLevels = tuple( listAdminLevels )
tupleAdminLevelsTarget = tuple( listAdminLevelsTarget )
# parse query and construct the SQL to use for it
# different types of focus area request are supported and they translate to different SQL queries
listSQLInserts = []
# get the current size of the table.
# if this is not 0 then we simply do no preprocessing and return the min and max primary key ID values
# because the focus area has already been preprocessed
strSQL = """SELECT MIN(loc_id), MAX(loc_id) FROM {:s}.{:s}""".format( schema,strFocusAreaID + '_' + table_admin )
listCount = database_pool[table_admin].execute_sql_query( (strSQL,None), timeout_statement, timeout_overall )
if len(listCount) != 1 :
raise Exception( 'global table row count failed' )
# result = [(nMin,nMax)] or [(None,None)] if no rows present
nRowStart = listCount[0][0]
nRowEnd = listCount[0][1]
bTableEmpty = False
if (nRowStart == None) or (nRowEnd == None) :
bTableEmpty = True
# insert global admin regions ONLY if table is empty
if bTableEmpty == True :
# note: there can be several entries for a single osmid with different ways and we allow them all as its miles too slow to ST_Buffer into a single polygon
# this means there will be multiple entires for some (OSMID, name) entries each with a different way
# note: check for line admin entries also as Wales is a line (!!!!) not a polygon (its not even a closed line so cannot make a polygon from it !!!!!)
# make all lines into polygons (adding start point to ensure its closed)
# note: some places have a line AND a polygon entry so we only get line if there is no polygon available
# note: deliberately allow the parent id == target id since if we have a top level region (e.g. UK) it will not have a parent so wont match WHERE clause and will not be returned
# note: using a UNION (not UNION ALL) removes duplicates
# note: use {osm_id_set} not osm_id to keep symetry with poly table where many ways with their own ids are aggregated into a single polygon
# note: use ST_IsRing filter for lines that are simple and closed before making them into polygons otherwise otherwise we get a GEOMContains error later during ST_Contains
strSQL = """
WITH
admin_poly AS (
SELECT
planet_osm_polygon.osm_id AS osm_id,
planet_osm_polygon.name AS name,
array[ planet_osm_polygon.osm_id ] AS osm_id_set,
planet_osm_polygon.way AS union_way,
hstore( array['name',planet_osm_polygon.name] || array['ref',planet_osm_polygon.ref] || array['admin level',planet_osm_polygon.admin_level] || array['place',planet_osm_polygon.place] || hstore_to_array( planet_osm_polygon.tags ) ) AS tags
FROM planet_osm_polygon
WHERE
planet_osm_polygon.name IS NOT NULL
AND planet_osm_polygon.boundary = 'administrative'
AND planet_osm_polygon.admin_level IN %s
AND ST_IsValid( planet_osm_polygon.way )
),
admin_lines AS (
SELECT
original_line.osm_id AS osm_id,
original_line.name AS name,
original_line.osm_id_set AS osm_id_set,
ST_MakePolygon( original_line.closed_line ) AS union_way,
original_line.tags AS tags
FROM
(
SELECT
planet_osm_line.osm_id AS osm_id,
planet_osm_line.name AS name,
array[ planet_osm_line.osm_id ] AS osm_id_set,
ST_AddPoint( planet_osm_line.way, ST_StartPoint( planet_osm_line.way ) ) AS closed_line,
hstore( array['name',planet_osm_line.name] || array['ref',planet_osm_line.ref] || array['admin level',planet_osm_line.admin_level] || array['place',planet_osm_line.place] || hstore_to_array( planet_osm_line.tags ) ) AS tags
FROM planet_osm_line
WHERE
NOT EXISTS( SELECT planet_osm_line.osm_id FROM admin_poly WHERE planet_osm_line.osm_id = admin_poly.osm_id )
AND planet_osm_line.name IS NOT NULL
AND planet_osm_line.boundary = 'administrative'
AND planet_osm_line.admin_level IN %s
AND ST_IsValid( planet_osm_line.way )
AND ST_NumPoints( planet_osm_line.way ) > 3
) AS original_line
WHERE
ST_IsRing( original_line.closed_line )
),
admin_all AS (
SELECT * FROM admin_poly
UNION
SELECT * FROM admin_lines
),
admin_poly_target AS (
SELECT
planet_osm_polygon.osm_id AS osm_id,
planet_osm_polygon.name AS name,
array[ planet_osm_polygon.osm_id ] AS osm_id_set,
planet_osm_polygon.way AS union_way,
hstore( array['name',planet_osm_polygon.name] || array['ref',planet_osm_polygon.ref] || array['admin level',planet_osm_polygon.admin_level] || array['place',planet_osm_polygon.place] || hstore_to_array( planet_osm_polygon.tags ) ) AS tags
FROM planet_osm_polygon
WHERE
planet_osm_polygon.name IS NOT NULL
AND planet_osm_polygon.boundary = 'administrative'
AND planet_osm_polygon.admin_level IN %s
AND ST_IsValid( planet_osm_polygon.way )
),
admin_lines_target AS (
SELECT
original_line.osm_id AS osm_id,
original_line.name AS name,
original_line.osm_id_set AS osm_id_set,
ST_MakePolygon( original_line.closed_line ) AS union_way,
original_line.tags AS tags
FROM
(
SELECT
planet_osm_line.osm_id AS osm_id,
planet_osm_line.name AS name,
array[ planet_osm_line.osm_id ] AS osm_id_set,
ST_AddPoint( planet_osm_line.way, ST_StartPoint( planet_osm_line.way ) ) AS closed_line,
hstore( array['name',planet_osm_line.name] || array['ref',planet_osm_line.ref] || array['admin level',planet_osm_line.admin_level] || array['place',planet_osm_line.place] || hstore_to_array( planet_osm_line.tags ) ) AS tags
FROM planet_osm_line
WHERE
NOT EXISTS( SELECT planet_osm_line.osm_id FROM admin_poly WHERE planet_osm_line.osm_id = admin_poly.osm_id )
AND planet_osm_line.name IS NOT NULL
AND planet_osm_line.boundary = 'administrative'
AND planet_osm_line.admin_level IN %s
AND ST_IsValid( planet_osm_line.way )
AND ST_NumPoints( planet_osm_line.way ) > 3
) AS original_line
WHERE
ST_IsRing( original_line.closed_line )
),
target_all AS (
SELECT * FROM admin_poly_target
UNION
SELECT * FROM admin_lines_target
)
INSERT INTO {:s}.{:s} ( name,osm_id_set,admin_regions,geom,tags )
SELECT
target_with_super_region.name,
target_with_super_region.osm_id_set,
array_agg( target_with_super_region.super_region ) AS admin_regions,
target_with_super_region.union_way,
target_with_super_region.tags
FROM
(
/* super region = any polygon that contains target polygon */
SELECT
target_all.name AS name,
target_all.osm_id_set AS osm_id_set,
admin_all.osm_id AS super_region,
target_all.union_way AS union_way,
target_all.tags AS tags
FROM target_all, admin_all
WHERE
ST_IsValid( admin_all.union_way )
AND admin_all.union_way && ST_Envelope( target_all.union_way )
AND ST_Contains( admin_all.union_way, target_all.union_way )
UNION
/* super region = target id set so that all targets appears even if they have no super regions and dont match the previous query (e.g. UK) */
SELECT
target_all.name AS name,
target_all.osm_id_set AS osm_id_set,
unnest( target_all.osm_id_set ) AS super_region,
target_all.union_way AS union_way,
target_all.tags AS tags
FROM target_all
) AS target_with_super_region
GROUP BY target_with_super_region.osm_id_set, target_with_super_region.name, target_with_super_region.union_way, target_with_super_region.tags
RETURNING loc_id;
""".format( schema, strFocusAreaID + '_' + table_admin )
tupleSQLParams = ( tupleAdminLevels, tupleAdminLevels, tupleAdminLevelsTarget, tupleAdminLevelsTarget )
listSQLInserts.append( (table_admin,strSQL,tupleSQLParams,logger,database_pool[table_admin], timeout_statement, timeout_overall) )
else :
# global table aready exists so no SQL needed
# but we still need to report the existing locations
# as if they were just populated so subsequent code (e.g. geoparse) can process them
dictNewLocations[ strFocusAreaID + '_' + table_admin ] = (nRowStart, nRowEnd)
# execute SQL inserts as query as we want results
# use parallel threads to execute SQL simultaneously for increased performance
# with a thread safe queue per table to track location ID results at end
# result = { db_table : (loc_start, loc_end) ... }
queueSQLInsertResults = { table_point : queue.Queue(), table_line : queue.Queue(), table_poly : queue.Queue(), table_admin : queue.Queue() }
if logger != None :
logger.info( 'starting SQL threads' )
listThreads = []
for strTableName in [ table_point, table_line, table_poly, table_admin ] :
# one thread per table type
# to ensure we sequentially add entriies to individual tables and therefore get back sequential ranges of p_id's
# which we will report start/end later
# note: non-sequential p_id entries will mean start/end will capture locations NOT part of a focus area (e.g. parallel inserts into admin table interleaving p_id's)
listTupleSQLInsert = []
for tupleSQLInsert in listSQLInserts :
if tupleSQLInsert[0] == strTableName :
listTupleSQLInsert.append( tupleSQLInsert )
if len(listTupleSQLInsert) > 0 :
# add an initial EXCLUSIVE table lock which only allows ACCESS SHARE (i.e. read only) to happen in parallel during table inserts
# so we are process and thread safe for multiple table inserts (e.g. admin poly + admin point)
# this is really important as we will return a start + end loc id and therefore assume we get a continuous loc id result set
# note: lock duration is the transaction so will be released once the commit() occurs (there is no unlock in Postgres as such)
# http://www.postgresql.org/docs/9.1/static/sql-lock.html
strLock = """LOCK {:s}.{:s} IN EXCLUSIVE MODE;""".format( schema, strFocusAreaID + '_' + strTableName )
tupleLock = ( strTableName, strLock, None, logger, database_pool[strTableName], timeout_statement, timeout_overall )
listTupleSQLInsert.insert( 0,tupleLock )
threadBuffer = threading.Thread( target = thread_worker_sql_insert, args = (listTupleSQLInsert,queueSQLInsertResults,logger) )
threadBuffer.daemon = False
threadBuffer.start()
listThreads.append( threadBuffer )
if logger != None :
logger.info( 'waiting for joins' )
for threadEntry in listThreads :
threadEntry.join()
if logger != None :
logger.info( 'join successful' )
for strTableName in [ table_point, table_line, table_poly, table_admin ] :
if queueSQLInsertResults[ strTableName ].empty() == False :
nLocIDStart = -1
nLocIDEnd = -1
while queueSQLInsertResults[ strTableName ].empty() == False :
nLocID = queueSQLInsertResults[ strTableName ].get()
if (nLocIDStart == -1) or (nLocIDStart > nLocID) :
nLocIDStart = nLocID
if (nLocIDEnd == -1) or (nLocIDEnd < nLocID) :
nLocIDEnd = nLocID
dictNewLocations[ strFocusAreaID + '_' + strTableName ] = (nLocIDStart,nLocIDEnd)
if logger != None :
logger.debug( 'LOC IDs = ' + repr(dictNewLocations) )
return dictNewLocations
[docs]def thread_worker_sql_insert( sql_list, sql_result_queue_dict, logger = None ) :
"""
Internal thread callback function to preprocess a new area. This function should nenver be called independantly of the execute_preprocessing_global() or execute_preprocessing_focus_area() functions.
All sql imports in list must be for the same database as we lock that database for the entire transaction to ensure inserted location IDs are sequential.
:param list sql_list: list of info required to execute sql query e.g. [ ('focus1_admin',sql_query_str,tuple_sql_data,logger,database_handle,timeout_statement,timeout_overall), ... ]
:param dict sql_result_queue_dict: dict of Queue() instances for each table type to store SQL results in a thread safe way e.g. { 'focus1_admin' : Queue(), ... }
"""
try :
# check args without defaults
if not isinstance( sql_list, list ) :
raise Exception( 'invalid sql_list' )
if not isinstance( sql_result_queue_dict, dict ) :
raise Exception( 'invalid sql_result_queue_dict' )
# nothing to do ?
if len(sql_list) == 0 :
return
# get database details (first entry as they should all be the same)
logger = sql_list[0][3]
dbHandler = sql_list[0][4]
timeout_statement = sql_list[0][5]
timeout_overall = sql_list[0][6]
# perform inserts sequentially for a specific table so we dont interleave p_id values
# for example if we have a focus area adding point + polygon data in two parallel threads
# note: all queries will be run in a transaction so this is process safe
listSQL = []
for tupleSQLInsert in sql_list :
listSQL.append( (tupleSQLInsert[1],tupleSQLInsert[2]) )
# note: use the SQL handler associated with this table as we will run SQL commands in a multi-threaded
# way for maximum performance. cannot use multi SQL cursors as this will add all commands to a single
# transaction, which is serial and not the same as using 4 seperate connections in parallel
if logger != None :
logger.info('start SQL (' + sql_list[0][0] + ' x ' + str(len(sql_list)) + ') ...' )
# reconnect as there seems to be a thread issue with the connection object (maybe connection object is not thread safe?)
dbHandler.reconnect()
# execute SQL
listNewIDs = dbHandler.execute_sql_query_batch( listSQL, timeout_statement, timeout_overall )
if logger != None :
logger.info('... end SQL (' + sql_list[0][0] + ' x ' + str(len(sql_list)) + ') ...' )
# parse SQL result [(loc_id,)...]
for tupleLocID in listNewIDs :
sql_result_queue_dict[ sql_list[0][0] ].put( tupleLocID[0] )
except :
if logger != None :
logger.exception( 'thread_worker_sql_insert() exception (location results will be ignored)' )
[docs]def cache_preprocessed_locations( database_handle, location_ids, schema, geospatial_config, timeout_statement = 600, timeout_overall = 600, spatial_filter = None ) :
"""
Load a set of previously preprocessed locations from database. The cached location structure returned is used by geoparsepy.geo_parse_lib functions.
:param PostgresqlHandler.PostgresqlHandler database_handle: handle to database object
:param dict location_ids: for each table the range of locations ids to load. A -1 for min or max indicates no min or max range. Use a range of (-1,-1) for all locations. e.g. { 'focus1_admin' : [nStartID,nEndID], ... }
:param str schema: Postgres schema name under which tables will be created
:param dict geospatial_config: config object returned from a call to geoparsepy.geo_parse_lib.get_geoparse_config()
:param int timeout_statement: number of seconds to allow each SQL statement
:param int timeout_overall: number of seconds total to allow each SQL statement (including retries)
:param str spatial_filter: OpenGIS spatial polygon to use as a spatial filter for returned locations (ST_Intersects used). This is optional and can be None.
:return: list structure containing location information to be used by geoparsepy.geo_parse_lib functions e.g. [loc_id,name,(osm_id,...),(admin_id,...),ST_AsText(geom),{tag:value},(variant_phrase, ...)]
:rtype: list
"""
# check args without defaults
if not isinstance( database_handle, soton_corenlppy.PostgresqlHandler.PostgresqlHandler ) :
raise Exception( 'invalid database_pool' )
if not isinstance( location_ids, dict ) :
raise Exception( 'invalid location_ids' )
if (not isinstance( schema, str )) and (not isinstance( schema, str)) :
raise Exception( 'invalid schema' )
if not isinstance( geospatial_config, dict ) :
raise Exception( 'invalid geospatial_config' )
if geospatial_config['logger'] != None :
geospatial_config['logger'].info( 'caching locations : ' + repr(location_ids) )
# loop on each source to allow source filtering
# and get data in smaller chunks to avoid running out of memory (e.g. geonames has 3 million entries)
listPlaceData = []
listSQL = []
for strTableName in location_ids :
listLocIDs = location_ids[ strTableName ]
if (spatial_filter != None) and (len(spatial_filter) > 0) :
strSpatialClause = """ST_Intersects( geom, ST_GeomFromText( '{:s}',4326 ) )""".format( spatial_filter )
else :
strSpatialClause = None
# query to get all locations within the range specified
# note: make a unique location ID by adding the table name to end (e.g. '123_flood_ny_admin')
# note: hstore_to_matrix() will convert tags::hstore to a 2d array {{key,value},...}
# result = tuple = (1, 'Saxon Wharf', [-3519617L], [-62149L, -151304L, -127864L, -3519617L], 'POLYGON((...))', {{'name':'Saxon Wharf'},{'type':'multipolygon'}, {'place':'locality'}, {'landuse':'industrial'}, {'source:location':'Bing Imagery'}}')
if (listLocIDs[0] > -1) and (listLocIDs[1] > -1) :
strSQLQuery = """SELECT concat('{:s}_',loc_id),name,osm_id_set,admin_regions,ST_AsText(geom),hstore_to_matrix(tags) FROM {:s}.{:s} WHERE loc_id >= %s AND loc_id <= %s""".format( strTableName, schema, strTableName )
tupleSQLParams = (listLocIDs[0], listLocIDs[1])
elif (listLocIDs[0] > -1) :
strSQLQuery = """SELECT concat('{:s}_',loc_id),name,osm_id_set,admin_regions,ST_AsText(geom),hstore_to_matrix(tags) FROM {:s}.{:s} WHERE loc_id >= %s""".format( strTableName, schema, strTableName )
tupleSQLParams = (listLocIDs[0],)
elif (listLocIDs[1] > -1) :
strSQLQuery = """SELECT concat('{:s}_',loc_id),name,osm_id_set,admin_regions,ST_AsText(geom),hstore_to_matrix(tags) FROM {:s}.{:s} WHERE loc_id <= %s""".format( strTableName, schema, strTableName )
tupleSQLParams = (listLocIDs[1],)
else :
strSQLQuery = """SELECT concat('{:s}_',loc_id),name,osm_id_set,admin_regions,ST_AsText(geom),hstore_to_matrix(tags) FROM {:s}.{:s}""".format( strTableName, schema, strTableName )
tupleSQLParams = None
# add spatial query if needed
if strSpatialClause != None :
if tupleSQLParams == None :
strSQLQuery = strSQLQuery + ' WHERE ' + strSpatialClause
else :
strSQLQuery = strSQLQuery + ' AND ' + strSpatialClause
# add statement to list for execution
listSQL.append( (strSQLQuery, tupleSQLParams) )
# note: SQL returns UTF8 encoded <str> objects. to get <unicode> use unicode( strText, 'utf8' )
listRows = database_handle.execute_sql_query_batch( listSQL, timeout_statement, timeout_overall )
listResult = []
if len(listRows) > 0 :
for listLocationData in listRows :
# make SQL tuple result into a list so we can append to it
# we will later make a hashtable linking location tokens to the cached lists row index
listEntry = list( listLocationData )
# make a dict from the 2d array SQL returns for tags
# and replace the 2d array (dict is a lot quicker to search)
# note: ignore OSM NULL values for tags (e.g. ref = NULL)
# note: REMOVE any _ characters as OSM is inconsistent and varies between 'admin level' and 'admin_level'
dictTags = {}
for listTagValue in listEntry[5] :
if listTagValue[1] != None :
strTag = listTagValue[0].replace( '_',' ' )
strValue = listTagValue[1]
if not strTag in dictTags :
dictTags[ strTag ] = strValue
listEntry[5] = dictTags
# convert lists of OSM IDs to tuples so we can use them as indexs in dictionaries
listEntry[2] = tuple( listEntry[2] )
listEntry[3] = tuple( listEntry[3] )
# get the basic phrase for this location
strPhrase = soton_corenlppy.common_parse_lib.clean_text( listEntry[1], geospatial_config )
# get an alternative names (e.g. language variants, ref, alt) from the tag data
# and for building/street/admin names do language specific token expansion
listVariantPhrases = geoparsepy.geo_parse_lib.expand_osm_alternate_names( listEntry[2], strPhrase, dictTags, geospatial_config )
# add full set of name variants as an extra dimension at end of location info
listEntry.append( tuple( listVariantPhrases ) )
# add location entry to result set
listResult.append( listEntry )
# DEBUG
#for nIndex in range(0,100) :
# if len(listResult) > nIndex :
# geospatial_config['logger'].info( 'PRE_SET = ' + repr(listResult[nIndex][2]) + ' : ' + repr(listResult[nIndex][6]) )
#for nIndex in range(0,1000) :
# if listResult[nIndex][1] == 'Paris' :
# geospatial_config['logger'].info( 'PRE_PARIS = ' + repr(listResult[nIndex][2]) + ' : ' + repr(listResult[nIndex][6]) + ' : ' + repr(listResult[nIndex][5]) )
# all done
return listResult
[docs]def get_point_for_osmid( osm_id, osm_type, geom, database_handle, timeout_statement = 60, timeout_overall = 60 ) :
"""
calculate a representative point for a OSMID. if its a relation lookup the admin centre, capital or label node members. if its a node use that point. otherwise use shapely lib to calc a centroid point for this polygon.
this function requires a database connection to OSM planet database
:param int osm_id: OSM ID of a relation, way or node
:param str osm_type: OSM Type as returned by geoparsepy.geo_parse_lib.calc_OSM_type()
:param str geom: OpenGIS geometry for OSM ID
:param PostgresqlHandler.PostgresqlHandler database_handle: handle to database object connected to OSM database (with tables public.planet_osm_point and public.planet_osm_rels available)
:param int timeout_statement: number of seconds to allow each SQL statement
:param int timeout_overall: number of seconds total to allow each SQL statement (including retries)
:return: coordinates (long, lat) for a point that represents this OSM ID well (e.g. admin centre or centroid)
:rtype: tuple
"""
# check args without defaults
if (not isinstance( osm_id, int )) and (not isinstance( osm_id, int )) :
raise Exception( 'invalid osm_id' )
if (not isinstance( osm_type, str )) and (not isinstance( osm_type, str )) :
raise Exception( 'invalid osm_type' )
if (not isinstance( geom, str )) and (not isinstance( geom, str )) :
raise Exception( 'invalid geom' )
if not isinstance( database_handle, soton_corenlppy.PostgresqlHandler.PostgresqlHandler ) :
raise Exception( 'invalid database_handle' )
# for admin areas try to find the admin centre node (if there is one)
# note: only rels table has a members column that might include a label or admin centre, ways do not
if osm_type == 'admin' :
if osm_id < 0 :
strSQL = """
select members from public.planet_osm_rels WHERE id = {:s}
""".format( str(-1*osm_id) )
listRows = database_handle.execute_sql_query( (strSQL,None), timeout_statement, timeout_overall )
if len( listRows ) == 0 :
raise Exception( 'relation found without an entry in planet_osm_rels : ' + str(osm_id) )
listMembers = listRows[0][0]
if ('admin_centre' in listMembers) or ('label' in listMembers) or ('capital' in listMembers) :
# get admin_centre or if not present capital or label node for the point of choice
if listMembers.count( 'admin_centre' ) > 0 :
nIndex = listMembers.index( 'admin_centre' )
elif listMembers.count( 'capital' ) > 0 :
nIndex = listMembers.index( 'capital' )
else :
nIndex = listMembers.index( 'label' )
if nIndex == 0 :
raise Exception( 'admin_centre or label at index 0 in list (should be impossible) : ' + str(osm_id) )
# get admin centre and if its a node get the point (otherwise fall back to centroid calculation)
strAdminCentreOSMID = listMembers[ nIndex-1 ]
if strAdminCentreOSMID.startswith( 'n' ) :
nAdminCentreOSMID = int( strAdminCentreOSMID[ 1: ] )
strSQL = """
select ST_AsText( way ) from public.planet_osm_point WHERE osm_id = {:s}
""".format( str(nAdminCentreOSMID) )
# note: the long lat in planet_osm_nodes is NOT a proper coordinate and the point way column should be used instead
# note: its possible node references is not in point table as mistakes happen in OSM
listRows = database_handle.execute_sql_query( (strSQL,None), timeout_statement, timeout_overall )
if len( listRows ) != 0 :
strGeom = listRows[0][0]
if not strGeom.startswith('POINT(') :
raise Exception( 'OSM node geom not a point (should be impossible) : ' + str(nAdminCentreOSMID) )
tupleCoords = strGeom[6:-1].split(' ')
nLong = float( tupleCoords[0] )
nLat = float( tupleCoords[1] )
return ( nLong, nLat )
# otherwise use shapely to calculate a representative point
shapeGeom = shapely.wkt.loads( geom )
if isinstance( shapeGeom, shapely.geometry.point.Point ) :
pointC = shapeGeom
else :
pointC = shapeGeom.centroid
# check for a fail response as some OSM node points appear to fail (!)
if (pointC == None) or (pointC.coords == []) :
return None
# return point coordinates (long, lat)
nX = float( pointC.x )
nY = float( pointC.y )
return (nX,nY)