Source code for soton_corenlppy.PostgresqlHandler

# !/usr/bin/env python
# -*- coding: utf-8 -*-

"""
..
	/////////////////////////////////////////////////////////////////////////
	//
	// (c) Copyright University of Southampton IT Innovation, 2012
	//
	// Copyright in this software belongs to IT Innovation Centre of
	// Gamma House, Enterprise Road, Southampton SO16 7NS, UK.
	//
	// This software may not be used, sold, licensed, transferred, copied
	// or reproduced in whole or in part in any manner or form or in or
	// on any media by any person other than in accordance with the terms
	// of the Licence Agreement supplied with the software, or otherwise
	// without the prior written consent of the copyright owners.
	//
	// This software is distributed WITHOUT ANY WARRANTY, without even the
	// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
	// PURPOSE, except where stated in the Licence Agreement supplied with
	// the software.
	//
	// Created By : Stuart E. Middleton
	// Created Date : 2014/03/21
	// Created for Project: REVEAL
	//
	/////////////////////////////////////////////////////////////////////////
	//
	// Dependancies : None
	//
	/////////////////////////////////////////////////////////////////////////
	'''

Postgresql database io handler using psycopg2:
	* see http://initd.org/psycopg/docs/module.html
	* see http://initd.org/psycopg/docs/usage.html#
	* see http://www.postgresql.org/docs/9.1/static/libpq-connect.html
	* see http://www.postgresql.org/docs/current/static/runtime-config.html

note on spotting connection failures in Postgres via psycopg2:
	* connection.closed --> tells us if a connection has been closed using close(). this is useless if the network fails and connection is closed this way
	* connection.status --> tells us about a transaction status (ready, prepared etc). this lets us know if a statements are prepared but not committed but has nothing to say aboyt if the connection is alive or not
	* connection.reset() --> reset transaction not the connection
	* the only sure way to know if a connection is alive is to execute a statement and catch the exception, then reconnect using original details
"""

import os, re, sys, copy, collections, codecs, string, traceback, datetime, time, math, logging
import psycopg2, psycopg2.extras
from soton_corenlppy.SqlHandler import SqlHandler

[docs]class PostgresqlHandler( SqlHandler ) : """ Postgresql handler class to allow clients to execute robust, retry on failure type SQL statements. """ def __init__( self, user, passw, hostname, port, database, timeout_statement = 60 ) : """ constructor - connects to a Postgresql database :param str user: username to access database with :param str passw: password to access database with :param str hostname: hostname of machine with database endpoint :param int port: port number of database endpoint :param int database: database name to connect to :param int timeout_statement: number of seconds to allow each SQL statement """ # compile regex to extract codes from Postgres error messages (see evaluate_sql_error() for details) # match format = :[?????] self.error_code_regex = re.compile('[\:][\[][\S]{5}[\]]') # disconnection phrases self.listDisconnectPhrases = [] self.listDisconnectPhrases.append( 'server closed the connection unexpectedly' ) self.listDisconnectPhrases.append( 'could not connect to server: Connection refused' ) # http://www.postgresql.org/docs/current/static/errcodes-appendix.html#ERRCODES-TABLE # 03000 sql_statement_not_yet_complete # 08000 connection_exception # 08001 sqlclient_unable_to_establish_sqlconnection # 08003 connection_does_not_exist # 08006 connection_failure # 53300 too_many_connections # 57014 query cancelled (i.e. timeout) # 58030 io_error self.listAllowedCodes = [] self.listAllowedCodes.append( '03000' ) self.listAllowedCodes.append( '08000' ) self.listAllowedCodes.append( '08001' ) self.listAllowedCodes.append( '08003' ) self.listAllowedCodes.append( '08006' ) self.listAllowedCodes.append( '53300' ) self.listAllowedCodes.append( '57014' ) self.listAllowedCodes.append( '58030' ) # init self.connection = None dateCurrent = datetime.datetime.now() timeMax = datetime.timedelta( seconds=timeout_statement ) dateExpire = dateCurrent + timeMax bAbort = False strLastError = '' # note: self.bConnClosed is a flag to indicate that the client has closed the connection and it should NOT be reconnedted self.bConnClosed = False # remember connection details for reconnect later self.connDatabase = database self.connUser = user self.connPassword = passw self.connHost = hostname self.connPort = port self.connConnectTimeout = timeout_statement # note: Postgres option statement_timeout is in milliseconds #self.connOptions = '-c statement_timeout=' + str(timeout_statement*1000) # loop trying to execute the given SQL statement # until the expiry time while (bAbort == False) and (datetime.datetime.now() < dateExpire) : try : # remember the open connection self.reconnect() return except psycopg2.Error as err : # check for the special case that the database connection has been terminated (e.g. Postgres reboot) # and needs to be restarted. this is NOT easy in psycopg2 as the standard error codes are embedded # within the message (!) not the err.pgcode of the psycopg2.OperationalError class if self.check_for_disconnect( err ) == True : strLastError = '[' + repr(err.pgcode) + '] ' + repr(err) try : # reconnect and then try again after 1 second self.reconnect() except : # if reconnect fails try again after 1 second (which will fail) and come back here for another reconnect attempt logging.debug( 'reconnect failed (will retry in 1 second)' ) pass else : strLastError = '[' + repr(err.pgcode) + '] ' + repr(err) bAbort = self.evaluate_sql_error( err ) # failure so throw a meaningful exception to help client #KEM line below does not work, as strUser, etc are expected to be global variables! #raise Exception( 'Postgresql connection failed [' + strUser + ':' + strPass + '@' + strHost + ':' + str(nPort) + '/' + strDatabase + ']' + ' : ' + strLastError ) raise Exception( 'Postgresql connection failed [' + user + ':' + passw + '@' + hostname + ':' + str(port) + '/' + database + ']' + ' : ' + strLastError )
[docs] def execute_sql_query( self, query, timeout_statement = 60, timeout_overall = 180 ) : """ execute a single SQL query and return the result (if any) | note : use variables for all data that has escape characters, non-ascii encoding or is simply large as opposed to niavely serializing the data into an SQL query string | note : unicode characters are returned as <str> 8-bit UTF-8 encoded strings. You can get back to <unicode> using strResultString.decode( 'utf-8' ). :param tuple query: SQL query tuple to execute to get config JSON object. e.g. ( "SELECT var FROM mytable WHERE var = %s", ('match_value',) ). if there is no data part to the query None can be provided e.g. ( "SELECT * FROM mytable", None ) :param int timeout_statement: number of seconds to allow each SQL statement :param int timeout_overall: number of seconds total to allow each SQL statement (including retries) :return: Python list with result row data OR empty list (no data) :rtype: list """ # check args if query == None : raise Exception( 'SQL query failed : null sql statement' ) if not isinstance( query, tuple ) : raise Exception( 'SQL query failed : sql statement not a tuple' ) if len(query) != 2 : raise Exception( 'SQL query failed : sql statement not a tuple of length 2' ) if not isinstance( query[0], str ) : raise Exception( 'SQL query failed : sql statement query string not type str' ) if (query[1] != None) and (not isinstance( query[1], tuple )) : raise Exception( 'SQL query failed : sql statement variable not tuple or None' ) # setup timings dateCurrent = datetime.datetime.now() timeMax = datetime.timedelta( seconds=timeout_overall ) dateExpire = dateCurrent + timeMax bAbort = False strLastError = '' # check connection if self.connection == None : raise Exception( 'SQL query failed : connection handle None - construction must have failed' ) # loop trying to execute the given SQL statement # until the expiry time while (bAbort == False) and (datetime.datetime.now() < dateExpire) : cursor = None try : # if this connection is part-way through a transaction reset it as this # indicates a previous SQL statement failure if self.connection.status != psycopg2.extensions.STATUS_READY : # rollback failed transaction and get ready for a new one self.connection.reset() cursor = self.connection.cursor() if query[1] != None : cursor.execute( query[0], query[1] ) else : cursor.execute( query[0] ) # postgres needs a commit even for SELECT self.connection.commit() # get results listResult = cursor.fetchall() # all done cursor.close() # success return listResult except psycopg2.Error as err : # check for the special case that the database connection has been terminated (e.g. Postgres reboot) # and needs to be restarted. this is NOT easy in psycopg2 as the standard error codes are embedded # within the message (!) not the err.pgcode of the psycopg2.OperationalError class if self.check_for_disconnect( err ) == True : strLastError = '[' + repr(err.pgcode) + '] ' + repr(err) try : # reconnect and then try again after 1 second self.reconnect() except : # if reconnect fails try again after 1 second (which will fail) and come back here for another reconnect attempt logging.debug( 'reconnect failed (will retry in 1 second)' ) pass else : strLastError = '[' + repr(err.pgcode) + '] ' + repr(err) bAbort = self.evaluate_sql_error( err ) # failure so close cursor before retry loop if cursor != None : cursor.close() # sleep for 1 second before retrying time.sleep( 1 ) # failure raise Exception( 'SQL query failed (timeout retrying) : ' + strLastError + ' : ' + query[0] )
[docs] def execute_sql_query_batch( self, query_list, timeout_statement = 60, timeout_overall = 180 ) : """ execute a batch of SQL queries and return the concatenated result (if any) | note : non-query statements ARE allowed here (e.g. LOCK) to allow INSERT or LOCK in the transaction before a query | note : use variables for all data that has escape characters, non-ascii encoding or is simply large as opposed to niavely serializing the data into an SQL query string | note : unicode characters are returned as <str> 8-bit UTF-8 encoded strings. You can get back to <unicode> using strResultString.decode( 'utf-8' ). :param list query_list: list of SQL query tuples to execute to get config JSON object. e.g. [ ( "SELECT var FROM mytable WHERE var = %s", ('match_value',) ), ... ]. if there is no data part to the query None can be provided e.g. [( "SELECT * FROM mytable", None ), ... ] :param int timeout_statement: number of seconds to allow each SQL statement :param int timeout_overall: number of seconds total to allow each SQL statement (including retries) :return: Python list with result row data OR empty list (no data) :rtype: list """ # check args if (query_list == None) or len(query_list) == 0 : raise Exception( 'SQL query failed : null or empty sql statement list' ) # setup timings dateCurrent = datetime.datetime.now() timeMax = datetime.timedelta( seconds=timeout_overall ) dateExpire = dateCurrent + timeMax bAbort = False strLastError = '' # check connection if self.connection == None : raise Exception( 'SQL query failed : connection handle None - construction must have failed' ) # loop trying to execute the given SQL statement # until the expiry time nStatementIndex = 0 while (bAbort == False) and (datetime.datetime.now() < dateExpire) : cursor = None listResult = [] try : # if this connection is part-way through a transaction reset it as this # indicates a previous SQL statement failure if self.connection.status != psycopg2.extensions.STATUS_READY : # rollback failed transaction and get ready for a new one self.connection.reset() cursor = self.connection.cursor() # note: psycopg2 will open a transaction on the first command for tupleStatement in query_list : if not isinstance( tupleStatement, tuple ) : raise Exception( 'SQL query failed : sql statement not a tuple' ) if len(tupleStatement) != 2 : raise Exception( 'SQL query failed : sql statement not a tuple of length 2' ) if not isinstance( tupleStatement[0], str ) : raise Exception( 'SQL query failed : sql statement query string not type str' ) if (tupleStatement[1] != None) and (not isinstance( tupleStatement[1], tuple )) : raise Exception( 'SQL query failed : sql statement variable not tuple or None' ) if tupleStatement[1] != None : cursor.execute( tupleStatement[0], tupleStatement[1] ) else : cursor.execute( tupleStatement[0] ) # get results (extend the aggregated result list) # note: if there are no results this causes an error (!?!) and it looks like the only way # to handle this is to catch the psycopg2.ProgrammingError exception try : listResult.extend( cursor.fetchall() ) except psycopg2.ProgrammingError : # no results (e.g. if its a LOCK statement of something) pass # commit any changes that may have been made by the query self.connection.commit() # all done cursor.close() # success return listResult except psycopg2.Error as err : # check for the special case that the database connection has been terminated (e.g. Postgres reboot) # and needs to be restarted. this is NOT easy in psycopg2 as the standard error codes are embedded # within the message (!) not the err.pgcode of the psycopg2.OperationalError class if self.check_for_disconnect( err ) == True : strLastError = '[' + repr(err.pgcode) + '] ' + repr(err) try : # reconnect and then try again after 1 second self.reconnect() except : # if reconnect fails try again after 1 second (which will fail) and come back here for another reconnect attempt logging.debug( 'reconnect failed (will retry in 1 second)' ) pass else : strLastError = '[' + repr(err.pgcode) + '] ' + repr(err) bAbort = self.evaluate_sql_error( err ) # failure so close cursor before retry loop if cursor != None : cursor.close() # sleep for 1 second before retrying time.sleep( 1 ) # failure raise Exception( 'SQL query failed (timeout retrying) : ' + strLastError + ' : ' + tupleStatement[0] )
[docs] def execute_sql_statement( self, statement_list, timeout_statement = 60, timeout_overall = 180 ) : """ execute a set of SQL statements (insert, create etc) with no result expected | note : use variables for all data that has escape characters, non-ascii encoding or is simply large as opposed to niavely serializing the data into an SQL query string :param list statement_list: list of SQL statements in tuple form to execute in a single commit e.g. [ ( "INSERT INTO mytable VALUES(%s,%s)", ('value1','value2') ), ... ]. if there is no data part to the query None can be provided e.g. ( "INSERT INTO mytable VALUES(1)", None ) :param int timeout_statement: number of seconds to allow each SQL statement :param int timeout_overall: number of seconds total to allow each SQL statement (including retries) """ # check args if statement_list == None : raise Exception( 'SQL statement failed : null sql statement list' ) if not isinstance( statement_list, list ) : raise Exception( 'SQL statement failed : sql statement list not a list' ) # check connection if self.connection == None : raise Exception( 'SQL query failed : connection handle None - construction must have failed' ) nStatementIndex = 0 # setup timings dateCurrent = datetime.datetime.now() timeMax = datetime.timedelta( seconds=timeout_overall ) dateExpire = dateCurrent + timeMax bAbort = False strLastError = '' # get a cursor to execute the statement set while (bAbort == False) and (datetime.datetime.now() < dateExpire) : cursor = None try : # if this connection is part-way through a transaction reset it as this # indicates a previous SQL statement failure if self.connection.status != psycopg2.extensions.STATUS_READY : # rollback failed transaction and get ready for a new one self.connection.reset() # get a cursor cursor = self.connection.cursor() # loop trying to execute the given SQL statement adding them to SQL transaction for nStatementIndex in range(len(statement_list)) : if not isinstance( statement_list[nStatementIndex], tuple ) : raise Exception( 'SQL statement @ index ' + str(nStatementIndex) + ' not a tuple' ) if not isinstance( statement_list[nStatementIndex][0], str ) : raise Exception( 'SQL statement[0] @ index ' + str(nStatementIndex) + ' not a str' ) if (statement_list[nStatementIndex][1] != None) and (not isinstance( statement_list[nStatementIndex][1], tuple )) : raise Exception( 'SQL statement[1] @ index ' + str(nStatementIndex) + ' not tuple or None' ) if statement_list[nStatementIndex][1] != None : # [ sql_statement_with_format, tuple_data_objects ] cursor.execute( statement_list[nStatementIndex][0], statement_list[nStatementIndex][1] ) else : # just_sql_statement cursor.execute( statement_list[nStatementIndex][0] ) # commit statements and close cursor self.connection.commit() cursor.close() # success return except psycopg2.Error as err : # check for the special case that the database connection has been terminated (e.g. Postgres reboot) # and needs to be restarted. this is NOT easy in psycopg2 as the standard error codes are embedded # within the message (!) not the err.pgcode of the psycopg2.OperationalError class if self.check_for_disconnect( err ) == True : strLastError = '[' + repr(err.pgcode) + '] ' + repr(err) try : # reconnect and then try again after 1 second self.reconnect() except : # if reconnect fails try again after 1 second (which will fail) and come back here for another reconnect attempt logging.debug( 'reconnect failed (will retry in 1 second)' ) pass else : strLastError = '[' + repr(err.pgcode) + '] ' + repr(err) bAbort = self.evaluate_sql_error( err ) # failure so close cursor before retry loop if cursor != None : cursor.close() # sleep for 1 second before retrying time.sleep( 1 ) # timeout raise Exception( 'SQL statement failed (timeout retrying) :' + strLastError )
[docs] def evaluate_sql_error( self, err ) : """ evaluate an sql error to see if we need to abort :param psycopg2.Error err: SQL exception object :return: True if the SQL query or statement can be retried (e.g. network connection error that might go away), or False if it should be aborted (e.g. SQL syntax error) :rtype: bool """ # get possible error codes from the message text as postgres python libs do NOT set err.pgcode # as often as they should (!) strExtractedCode = None # check for OperationalError as this is thrown on database connection failure so a retry is worthwhile listCodes = self.error_code_regex.findall( repr(err) ) if len(listCodes) > 0 : strExtractedCode = listCodes[0][2:7] logging.debug('extracted SQL error code = {' + str(strExtractedCode) + '}' ) # check pgcode first (might be None) if (err.pgcode) != None and (err.pgcode in self.listAllowedCodes) : # allow a retry logging.debug( 'SQL statement failed (will retry) : [' + repr(err.pgcode) + '] ' + repr(err) ) return False else : if strExtractedCode != None : if strExtractedCode in self.listAllowedCodes : # allow a retry logging.debug( 'SQL statement failed (will retry) : [' + strExtractedCode + '] ' + repr(err) ) return False # abort as we have an unknown error return True
[docs] def check_for_disconnect( self, err ) : """ special connection failure parsing :param psycopg2.Error err: SQL exception object :return: True if the error indicates we are disconnected :rtype: bool """ # check for disconnection message phrases if isinstance( err, psycopg2.OperationalError ) : strMessage = repr( err.pgerror ) for nTestIndex in range(len(self.listDisconnectPhrases)) : if strMessage.count( self.listDisconnectPhrases[nTestIndex] ) > 0 : return True return False
[docs] def reconnect( self ) : """ reconnect using previously cached connection params | note : UTF8 client encoding is set for all connections """ if self.bConnClosed == False : # establish a connection #self.connection = psycopg2.connect( database=self.connDatabase, user=self.connUser, password=self.connPassword, host=self.connHost, port=self.connPort, connect_timeout=self.connConnectTimeout ,options=self.connOptions ) self.connection = psycopg2.connect( database=self.connDatabase, user=self.connUser, password=self.connPassword, host=self.connHost, port=self.connPort, connect_timeout=self.connConnectTimeout ) self.connection.set_client_encoding( 'utf8' ) # register hstore support (turn it on for all cursors globally) # this allows python dict to be mapped to hstore psycopg2.extras.register_hstore( conn_or_curs = self.connection, globally = True, oid=None, array_oid=None )
[docs] def close( self ) : """ close connecton and flag it as closed to prevent restarts | note : once flagged closed the connection will not be restarted using reconnect() """ self.bConnClosed = True if self.connection != None : self.connection.close()