Spaces:
Paused
Paused
| """ Unit tests version 2.6.1.0 for adodbapi""" | |
| """ | |
| adodbapi - A python DB API 2.0 interface to Microsoft ADO | |
| Copyright (C) 2002 Henrik Ekelund | |
| This library is free software; you can redistribute it and/or | |
| modify it under the terms of the GNU Lesser General Public | |
| License as published by the Free Software Foundation; either | |
| version 2.1 of the License, or (at your option) any later version. | |
| This library is distributed in the hope that it will be useful, | |
| but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
| Lesser General Public License for more details. | |
| You should have received a copy of the GNU Lesser General Public | |
| License along with this library; if not, write to the Free Software | |
| Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA | |
| Updates by Vernon Cole | |
| """ | |
| import copy | |
| import datetime | |
| import decimal | |
| import random | |
| import string | |
| import sys | |
| import unittest | |
| try: | |
| import win32com.client | |
| win32 = True | |
| except ImportError: | |
| win32 = False | |
| # run the configuration module. | |
| import adodbapitestconfig as config # will set sys.path to find correct version of adodbapi | |
| # in our code below, all our switches are from config.whatever | |
| import tryconnection | |
| import adodbapi | |
| import adodbapi.apibase as api | |
| try: | |
| import adodbapi.ado_consts as ado_consts | |
| except ImportError: # we are doing a shortcut import as a module -- so | |
| try: | |
| import ado_consts | |
| except ImportError: | |
| from adodbapi import ado_consts | |
| def str2bytes(sval): | |
| return sval.encode("latin1") | |
| long = int | |
| def randomstring(length): | |
| return "".join([random.choice(string.ascii_letters) for n in range(32)]) | |
| class CommonDBTests(unittest.TestCase): | |
| "Self contained super-simple tests in easy syntax, should work on everything between mySQL and Oracle" | |
| def setUp(self): | |
| self.engine = "unknown" | |
| def getEngine(self): | |
| return self.engine | |
| def getConnection(self): | |
| raise NotImplementedError # "This method must be overriden by a subclass" | |
| def getCursor(self): | |
| return self.getConnection().cursor() | |
| def testConnection(self): | |
| crsr = self.getCursor() | |
| assert crsr.__class__.__name__ == "Cursor" | |
| def testErrorHandlerInherits(self): | |
| if not self.remote: | |
| conn = self.getConnection() | |
| mycallable = lambda connection, cursor, errorclass, errorvalue: 1 | |
| conn.errorhandler = mycallable | |
| crsr = conn.cursor() | |
| assert ( | |
| crsr.errorhandler == mycallable | |
| ), "Error handler on crsr should be same as on connection" | |
| def testDefaultErrorHandlerConnection(self): | |
| if not self.remote: | |
| conn = self.getConnection() | |
| del conn.messages[:] | |
| try: | |
| conn.close() | |
| conn.commit() # Should not be able to use connection after it is closed | |
| except: | |
| assert len(conn.messages) == 1 | |
| assert len(conn.messages[0]) == 2 | |
| assert conn.messages[0][0] == api.ProgrammingError | |
| def testOwnErrorHandlerConnection(self): | |
| if self.remote: # ToDo: use "skip" | |
| return | |
| mycallable = ( | |
| lambda connection, cursor, errorclass, errorvalue: 1 | |
| ) # does not raise anything | |
| conn = self.getConnection() | |
| conn.errorhandler = mycallable | |
| conn.close() | |
| conn.commit() # Should not be able to use connection after it is closed | |
| assert len(conn.messages) == 0 | |
| conn.errorhandler = None # This should bring back the standard error handler | |
| try: | |
| conn.close() | |
| conn.commit() # Should not be able to use connection after it is closed | |
| except: | |
| pass | |
| # The Standard errorhandler appends error to messages attribute | |
| assert ( | |
| len(conn.messages) > 0 | |
| ), "Setting errorhandler to none should bring back the standard error handler" | |
| def testDefaultErrorHandlerCursor(self): | |
| crsr = self.getConnection().cursor() | |
| if not self.remote: | |
| del crsr.messages[:] | |
| try: | |
| crsr.execute("SELECT abbtytddrf FROM dasdasd") | |
| except: | |
| assert len(crsr.messages) == 1 | |
| assert len(crsr.messages[0]) == 2 | |
| assert crsr.messages[0][0] == api.DatabaseError | |
| def testOwnErrorHandlerCursor(self): | |
| if self.remote: # ToDo: should be a "skip" | |
| return | |
| mycallable = ( | |
| lambda connection, cursor, errorclass, errorvalue: 1 | |
| ) # does not raise anything | |
| crsr = self.getConnection().cursor() | |
| crsr.errorhandler = mycallable | |
| crsr.execute("SELECT abbtytddrf FROM dasdasd") | |
| assert len(crsr.messages) == 0 | |
| crsr.errorhandler = None # This should bring back the standard error handler | |
| try: | |
| crsr.execute("SELECT abbtytddrf FROM dasdasd") | |
| except: | |
| pass | |
| # The Standard errorhandler appends error to messages attribute | |
| assert ( | |
| len(crsr.messages) > 0 | |
| ), "Setting errorhandler to none should bring back the standard error handler" | |
| def testUserDefinedConversions(self): | |
| if self.remote: ## Todo: should be a "skip" | |
| return | |
| try: | |
| duplicatingConverter = lambda aStringField: aStringField * 2 | |
| assert duplicatingConverter("gabba") == "gabbagabba" | |
| self.helpForceDropOnTblTemp() | |
| conn = self.getConnection() | |
| # the variantConversions attribute should not exist on a normal connection object | |
| self.assertRaises(AttributeError, lambda x: conn.variantConversions[x], [2]) | |
| if not self.remote: | |
| # create a variantConversions attribute on the connection | |
| conn.variantConversions = copy.copy(api.variantConversions) | |
| crsr = conn.cursor() | |
| tabdef = ( | |
| "CREATE TABLE xx_%s (fldData VARCHAR(100) NOT NULL, fld2 VARCHAR(20))" | |
| % config.tmp | |
| ) | |
| crsr.execute(tabdef) | |
| crsr.execute( | |
| "INSERT INTO xx_%s(fldData,fld2) VALUES('gabba','booga')" | |
| % config.tmp | |
| ) | |
| crsr.execute( | |
| "INSERT INTO xx_%s(fldData,fld2) VALUES('hey','yo')" % config.tmp | |
| ) | |
| # change converter for ALL adoStringTypes columns | |
| conn.variantConversions[api.adoStringTypes] = duplicatingConverter | |
| crsr.execute( | |
| "SELECT fldData,fld2 FROM xx_%s ORDER BY fldData" % config.tmp | |
| ) | |
| rows = crsr.fetchall() | |
| row = rows[0] | |
| self.assertEqual(row[0], "gabbagabba") | |
| row = rows[1] | |
| self.assertEqual(row[0], "heyhey") | |
| self.assertEqual(row[1], "yoyo") | |
| upcaseConverter = lambda aStringField: aStringField.upper() | |
| assert upcaseConverter("upThis") == "UPTHIS" | |
| # now use a single column converter | |
| rows.converters[1] = upcaseConverter # convert second column | |
| self.assertEqual(row[0], "heyhey") # first will be unchanged | |
| self.assertEqual(row[1], "YO") # second will convert to upper case | |
| finally: | |
| try: | |
| del conn.variantConversions # Restore the default | |
| except: | |
| pass | |
| self.helpRollbackTblTemp() | |
| def testUserDefinedConversionForExactNumericTypes(self): | |
| # variantConversions is a dictionary of conversion functions | |
| # held internally in adodbapi.apibase | |
| # | |
| # !!! this test intentionally alters the value of what should be constant in the module | |
| # !!! no new code should use this example, to is only a test to see that the | |
| # !!! deprecated way of doing this still works. (use connection.variantConversions) | |
| # | |
| if not self.remote and sys.version_info < (3, 0): ### Py3 need different test | |
| oldconverter = adodbapi.variantConversions[ | |
| ado_consts.adNumeric | |
| ] # keep old function to restore later | |
| # By default decimal and "numbers" are returned as decimals. | |
| # Instead, make numbers return as floats | |
| try: | |
| adodbapi.variantConversions[ado_consts.adNumeric] = adodbapi.cvtFloat | |
| self.helpTestDataType( | |
| "decimal(18,2)", "NUMBER", 3.45, compareAlmostEqual=1 | |
| ) | |
| self.helpTestDataType( | |
| "numeric(18,2)", "NUMBER", 3.45, compareAlmostEqual=1 | |
| ) | |
| # now return strings | |
| adodbapi.variantConversions[ado_consts.adNumeric] = adodbapi.cvtString | |
| self.helpTestDataType("numeric(18,2)", "NUMBER", "3.45") | |
| # now a completly weird user defined convertion | |
| adodbapi.variantConversions[ado_consts.adNumeric] = ( | |
| lambda x: "!!This function returns a funny unicode string %s!!" % x | |
| ) | |
| self.helpTestDataType( | |
| "numeric(18,2)", | |
| "NUMBER", | |
| "3.45", | |
| allowedReturnValues=[ | |
| "!!This function returns a funny unicode string 3.45!!" | |
| ], | |
| ) | |
| finally: | |
| # now reset the converter to its original function | |
| adodbapi.variantConversions[ | |
| ado_consts.adNumeric | |
| ] = oldconverter # Restore the original convertion function | |
| def helpTestDataType( | |
| self, | |
| sqlDataTypeString, | |
| DBAPIDataTypeString, | |
| pyData, | |
| pyDataInputAlternatives=None, | |
| compareAlmostEqual=None, | |
| allowedReturnValues=None, | |
| ): | |
| self.helpForceDropOnTblTemp() | |
| conn = self.getConnection() | |
| crsr = conn.cursor() | |
| tabdef = ( | |
| """ | |
| CREATE TABLE xx_%s ( | |
| fldId integer NOT NULL, | |
| fldData """ | |
| % config.tmp | |
| + sqlDataTypeString | |
| + ")\n" | |
| ) | |
| crsr.execute(tabdef) | |
| # Test Null values mapped to None | |
| crsr.execute("INSERT INTO xx_%s (fldId) VALUES (1)" % config.tmp) | |
| crsr.execute("SELECT fldId,fldData FROM xx_%s" % config.tmp) | |
| rs = crsr.fetchone() | |
| self.assertEqual(rs[1], None) # Null should be mapped to None | |
| assert rs[0] == 1 | |
| # Test description related | |
| descTuple = crsr.description[1] | |
| assert descTuple[0] in ["fldData", "flddata"], 'was "%s" expected "%s"' % ( | |
| descTuple[0], | |
| "fldData", | |
| ) | |
| if DBAPIDataTypeString == "STRING": | |
| assert descTuple[1] == api.STRING, 'was "%s" expected "%s"' % ( | |
| descTuple[1], | |
| api.STRING.values, | |
| ) | |
| elif DBAPIDataTypeString == "NUMBER": | |
| assert descTuple[1] == api.NUMBER, 'was "%s" expected "%s"' % ( | |
| descTuple[1], | |
| api.NUMBER.values, | |
| ) | |
| elif DBAPIDataTypeString == "BINARY": | |
| assert descTuple[1] == api.BINARY, 'was "%s" expected "%s"' % ( | |
| descTuple[1], | |
| api.BINARY.values, | |
| ) | |
| elif DBAPIDataTypeString == "DATETIME": | |
| assert descTuple[1] == api.DATETIME, 'was "%s" expected "%s"' % ( | |
| descTuple[1], | |
| api.DATETIME.values, | |
| ) | |
| elif DBAPIDataTypeString == "ROWID": | |
| assert descTuple[1] == api.ROWID, 'was "%s" expected "%s"' % ( | |
| descTuple[1], | |
| api.ROWID.values, | |
| ) | |
| elif DBAPIDataTypeString == "UUID": | |
| assert descTuple[1] == api.OTHER, 'was "%s" expected "%s"' % ( | |
| descTuple[1], | |
| api.OTHER.values, | |
| ) | |
| else: | |
| raise NotImplementedError # "DBAPIDataTypeString not provided" | |
| # Test data binding | |
| inputs = [pyData] | |
| if pyDataInputAlternatives: | |
| inputs.extend(pyDataInputAlternatives) | |
| inputs = set(inputs) # removes redundant string==unicode tests | |
| fldId = 1 | |
| for inParam in inputs: | |
| fldId += 1 | |
| try: | |
| crsr.execute( | |
| "INSERT INTO xx_%s (fldId,fldData) VALUES (?,?)" % config.tmp, | |
| (fldId, inParam), | |
| ) | |
| except: | |
| if self.remote: | |
| for message in crsr.messages: | |
| print(message) | |
| else: | |
| conn.printADOerrors() | |
| raise | |
| crsr.execute( | |
| "SELECT fldData FROM xx_%s WHERE ?=fldID" % config.tmp, [fldId] | |
| ) | |
| rs = crsr.fetchone() | |
| if allowedReturnValues: | |
| allowedTypes = tuple([type(aRV) for aRV in allowedReturnValues]) | |
| assert isinstance( | |
| rs[0], allowedTypes | |
| ), 'result type "%s" must be one of %s' % (type(rs[0]), allowedTypes) | |
| else: | |
| assert isinstance( | |
| rs[0], type(pyData) | |
| ), 'result type "%s" must be instance of %s' % ( | |
| type(rs[0]), | |
| type(pyData), | |
| ) | |
| if compareAlmostEqual and DBAPIDataTypeString == "DATETIME": | |
| iso1 = adodbapi.dateconverter.DateObjectToIsoFormatString(rs[0]) | |
| iso2 = adodbapi.dateconverter.DateObjectToIsoFormatString(pyData) | |
| self.assertEqual(iso1, iso2) | |
| elif compareAlmostEqual: | |
| s = float(pyData) | |
| v = float(rs[0]) | |
| assert ( | |
| abs(v - s) / s < 0.00001 | |
| ), "Values not almost equal recvd=%s, expected=%f" % (rs[0], s) | |
| else: | |
| if allowedReturnValues: | |
| ok = False | |
| self.assertTrue( | |
| rs[0] in allowedReturnValues, | |
| 'Value "%s" not in %s' % (repr(rs[0]), allowedReturnValues), | |
| ) | |
| else: | |
| self.assertEqual( | |
| rs[0], | |
| pyData, | |
| 'Values are not equal recvd="%s", expected="%s"' | |
| % (rs[0], pyData), | |
| ) | |
| def testDataTypeFloat(self): | |
| self.helpTestDataType("real", "NUMBER", 3.45, compareAlmostEqual=True) | |
| self.helpTestDataType("float", "NUMBER", 1.79e37, compareAlmostEqual=True) | |
| def testDataTypeDecmal(self): | |
| self.helpTestDataType( | |
| "decimal(18,2)", | |
| "NUMBER", | |
| 3.45, | |
| allowedReturnValues=["3.45", "3,45", decimal.Decimal("3.45")], | |
| ) | |
| self.helpTestDataType( | |
| "numeric(18,2)", | |
| "NUMBER", | |
| 3.45, | |
| allowedReturnValues=["3.45", "3,45", decimal.Decimal("3.45")], | |
| ) | |
| self.helpTestDataType( | |
| "decimal(20,2)", | |
| "NUMBER", | |
| 444444444444444444, | |
| allowedReturnValues=[ | |
| "444444444444444444.00", | |
| "444444444444444444,00", | |
| decimal.Decimal("444444444444444444"), | |
| ], | |
| ) | |
| if self.getEngine() == "MSSQL": | |
| self.helpTestDataType( | |
| "uniqueidentifier", | |
| "UUID", | |
| "{71A4F49E-39F3-42B1-A41E-48FF154996E6}", | |
| allowedReturnValues=["{71A4F49E-39F3-42B1-A41E-48FF154996E6}"], | |
| ) | |
| def testDataTypeMoney(self): # v2.1 Cole -- use decimal for money | |
| if self.getEngine() == "MySQL": | |
| self.helpTestDataType( | |
| "DECIMAL(20,4)", "NUMBER", decimal.Decimal("-922337203685477.5808") | |
| ) | |
| elif self.getEngine() == "PostgreSQL": | |
| self.helpTestDataType( | |
| "money", | |
| "NUMBER", | |
| decimal.Decimal("-922337203685477.5808"), | |
| compareAlmostEqual=True, | |
| allowedReturnValues=[ | |
| -922337203685477.5808, | |
| decimal.Decimal("-922337203685477.5808"), | |
| ], | |
| ) | |
| else: | |
| self.helpTestDataType("smallmoney", "NUMBER", decimal.Decimal("214748.02")) | |
| self.helpTestDataType( | |
| "money", "NUMBER", decimal.Decimal("-922337203685477.5808") | |
| ) | |
| def testDataTypeInt(self): | |
| if self.getEngine() != "PostgreSQL": | |
| self.helpTestDataType("tinyint", "NUMBER", 115) | |
| self.helpTestDataType("smallint", "NUMBER", -32768) | |
| if self.getEngine() not in ["ACCESS", "PostgreSQL"]: | |
| self.helpTestDataType( | |
| "bit", "NUMBER", 1 | |
| ) # Does not work correctly with access | |
| if self.getEngine() in ["MSSQL", "PostgreSQL"]: | |
| self.helpTestDataType( | |
| "bigint", | |
| "NUMBER", | |
| 3000000000, | |
| allowedReturnValues=[3000000000, int(3000000000)], | |
| ) | |
| self.helpTestDataType("int", "NUMBER", 2147483647) | |
| def testDataTypeChar(self): | |
| for sqlDataType in ("char(6)", "nchar(6)"): | |
| self.helpTestDataType( | |
| sqlDataType, | |
| "STRING", | |
| "spam ", | |
| allowedReturnValues=["spam", "spam", "spam ", "spam "], | |
| ) | |
| def testDataTypeVarChar(self): | |
| if self.getEngine() == "MySQL": | |
| stringKinds = ["varchar(10)", "text"] | |
| elif self.getEngine() == "PostgreSQL": | |
| stringKinds = ["varchar(10)", "text", "character varying"] | |
| else: | |
| stringKinds = [ | |
| "varchar(10)", | |
| "nvarchar(10)", | |
| "text", | |
| "ntext", | |
| ] # ,"varchar(max)"] | |
| for sqlDataType in stringKinds: | |
| self.helpTestDataType(sqlDataType, "STRING", "spam", ["spam"]) | |
| def testDataTypeDate(self): | |
| if self.getEngine() == "PostgreSQL": | |
| dt = "timestamp" | |
| else: | |
| dt = "datetime" | |
| self.helpTestDataType( | |
| dt, "DATETIME", adodbapi.Date(2002, 10, 28), compareAlmostEqual=True | |
| ) | |
| if self.getEngine() not in ["MySQL", "PostgreSQL"]: | |
| self.helpTestDataType( | |
| "smalldatetime", | |
| "DATETIME", | |
| adodbapi.Date(2002, 10, 28), | |
| compareAlmostEqual=True, | |
| ) | |
| if tag != "pythontime" and self.getEngine() not in [ | |
| "MySQL", | |
| "PostgreSQL", | |
| ]: # fails when using pythonTime | |
| self.helpTestDataType( | |
| dt, | |
| "DATETIME", | |
| adodbapi.Timestamp(2002, 10, 28, 12, 15, 1), | |
| compareAlmostEqual=True, | |
| ) | |
| def testDataTypeBinary(self): | |
| binfld = str2bytes("\x07\x00\xE2\x40*") | |
| arv = [binfld, adodbapi.Binary(binfld), bytes(binfld)] | |
| if self.getEngine() == "PostgreSQL": | |
| self.helpTestDataType( | |
| "bytea", "BINARY", adodbapi.Binary(binfld), allowedReturnValues=arv | |
| ) | |
| else: | |
| self.helpTestDataType( | |
| "binary(5)", "BINARY", adodbapi.Binary(binfld), allowedReturnValues=arv | |
| ) | |
| self.helpTestDataType( | |
| "varbinary(100)", | |
| "BINARY", | |
| adodbapi.Binary(binfld), | |
| allowedReturnValues=arv, | |
| ) | |
| if self.getEngine() != "MySQL": | |
| self.helpTestDataType( | |
| "image", "BINARY", adodbapi.Binary(binfld), allowedReturnValues=arv | |
| ) | |
| def helpRollbackTblTemp(self): | |
| self.helpForceDropOnTblTemp() | |
| def helpForceDropOnTblTemp(self): | |
| conn = self.getConnection() | |
| with conn.cursor() as crsr: | |
| try: | |
| crsr.execute("DROP TABLE xx_%s" % config.tmp) | |
| if not conn.autocommit: | |
| conn.commit() | |
| except: | |
| pass | |
| def helpCreateAndPopulateTableTemp(self, crsr): | |
| tabdef = ( | |
| """ | |
| CREATE TABLE xx_%s ( | |
| fldData INTEGER | |
| ) | |
| """ | |
| % config.tmp | |
| ) | |
| try: # EAFP | |
| crsr.execute(tabdef) | |
| except api.DatabaseError: # was not dropped before | |
| self.helpForceDropOnTblTemp() # so drop it now | |
| crsr.execute(tabdef) | |
| for i in range(9): # note: this poor SQL code, but a valid test | |
| crsr.execute("INSERT INTO xx_%s (fldData) VALUES (%i)" % (config.tmp, i)) | |
| # NOTE: building the test table without using parameter substitution | |
| def testFetchAll(self): | |
| crsr = self.getCursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) | |
| rs = crsr.fetchall() | |
| assert len(rs) == 9 | |
| # test slice of rows | |
| i = 3 | |
| for row in rs[3:-2]: # should have rowid 3..6 | |
| assert row[0] == i | |
| i += 1 | |
| self.helpRollbackTblTemp() | |
| def testPreparedStatement(self): | |
| crsr = self.getCursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.prepare("SELECT fldData FROM xx_%s" % config.tmp) | |
| crsr.execute(crsr.command) # remember the one that was prepared | |
| rs = crsr.fetchall() | |
| assert len(rs) == 9 | |
| assert rs[2][0] == 2 | |
| self.helpRollbackTblTemp() | |
| def testWrongPreparedStatement(self): | |
| crsr = self.getCursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.prepare("SELECT * FROM nowhere") | |
| crsr.execute( | |
| "SELECT fldData FROM xx_%s" % config.tmp | |
| ) # should execute this one, not the prepared one | |
| rs = crsr.fetchall() | |
| assert len(rs) == 9 | |
| assert rs[2][0] == 2 | |
| self.helpRollbackTblTemp() | |
| def testIterator(self): | |
| crsr = self.getCursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) | |
| for i, row in enumerate( | |
| crsr | |
| ): # using cursor as an iterator, rather than fetchxxx | |
| assert row[0] == i | |
| self.helpRollbackTblTemp() | |
| def testExecuteMany(self): | |
| crsr = self.getCursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| seq_of_values = [(111,), (222,)] | |
| crsr.executemany( | |
| "INSERT INTO xx_%s (fldData) VALUES (?)" % config.tmp, seq_of_values | |
| ) | |
| if crsr.rowcount == -1: | |
| print( | |
| self.getEngine() | |
| + " Provider does not support rowcount (on .executemany())" | |
| ) | |
| else: | |
| self.assertEqual(crsr.rowcount, 2) | |
| crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) | |
| rs = crsr.fetchall() | |
| assert len(rs) == 11 | |
| self.helpRollbackTblTemp() | |
| def testRowCount(self): | |
| crsr = self.getCursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) | |
| if crsr.rowcount == -1: | |
| # print("provider does not support rowcount on select") | |
| pass | |
| else: | |
| self.assertEqual(crsr.rowcount, 9) | |
| self.helpRollbackTblTemp() | |
| def testRowCountNoRecordset(self): | |
| crsr = self.getCursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.execute("DELETE FROM xx_%s WHERE fldData >= 5" % config.tmp) | |
| if crsr.rowcount == -1: | |
| print(self.getEngine() + " Provider does not support rowcount (on DELETE)") | |
| else: | |
| self.assertEqual(crsr.rowcount, 4) | |
| self.helpRollbackTblTemp() | |
| def testFetchMany(self): | |
| crsr = self.getCursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) | |
| rs = crsr.fetchmany(3) | |
| assert len(rs) == 3 | |
| rs = crsr.fetchmany(5) | |
| assert len(rs) == 5 | |
| rs = crsr.fetchmany(5) | |
| assert len(rs) == 1 # Asked for five, but there is only one left | |
| self.helpRollbackTblTemp() | |
| def testFetchManyWithArraySize(self): | |
| crsr = self.getCursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) | |
| rs = crsr.fetchmany() | |
| assert len(rs) == 1 # arraysize Defaults to one | |
| crsr.arraysize = 4 | |
| rs = crsr.fetchmany() | |
| assert len(rs) == 4 | |
| rs = crsr.fetchmany() | |
| assert len(rs) == 4 | |
| rs = crsr.fetchmany() | |
| assert len(rs) == 0 | |
| self.helpRollbackTblTemp() | |
| def testErrorConnect(self): | |
| conn = self.getConnection() | |
| kw = {} | |
| if "proxy_host" in conn.kwargs: | |
| kw["proxy_host"] = conn.kwargs["proxy_host"] | |
| conn.close() | |
| self.assertRaises(api.DatabaseError, self.db, "not a valid connect string", kw) | |
| def testRowIterator(self): | |
| self.helpForceDropOnTblTemp() | |
| conn = self.getConnection() | |
| crsr = conn.cursor() | |
| tabdef = ( | |
| """ | |
| CREATE TABLE xx_%s ( | |
| fldId integer NOT NULL, | |
| fldTwo integer, | |
| fldThree integer, | |
| fldFour integer) | |
| """ | |
| % config.tmp | |
| ) | |
| crsr.execute(tabdef) | |
| inputs = [(2, 3, 4), (102, 103, 104)] | |
| fldId = 1 | |
| for inParam in inputs: | |
| fldId += 1 | |
| try: | |
| crsr.execute( | |
| "INSERT INTO xx_%s (fldId,fldTwo,fldThree,fldFour) VALUES (?,?,?,?)" | |
| % config.tmp, | |
| (fldId, inParam[0], inParam[1], inParam[2]), | |
| ) | |
| except: | |
| if self.remote: | |
| for message in crsr.messages: | |
| print(message) | |
| else: | |
| conn.printADOerrors() | |
| raise | |
| crsr.execute( | |
| "SELECT fldTwo,fldThree,fldFour FROM xx_%s WHERE ?=fldID" % config.tmp, | |
| [fldId], | |
| ) | |
| rec = crsr.fetchone() | |
| # check that stepping through an emulated row works | |
| for j in range(len(inParam)): | |
| assert ( | |
| rec[j] == inParam[j] | |
| ), 'returned value:"%s" != test value:"%s"' % (rec[j], inParam[j]) | |
| # check that we can get a complete tuple from a row | |
| assert tuple(rec) == inParam, 'returned value:"%s" != test value:"%s"' % ( | |
| repr(rec), | |
| repr(inParam), | |
| ) | |
| # test that slices of rows work | |
| slice1 = tuple(rec[:-1]) | |
| slice2 = tuple(inParam[0:2]) | |
| assert slice1 == slice2, 'returned value:"%s" != test value:"%s"' % ( | |
| repr(slice1), | |
| repr(slice2), | |
| ) | |
| # now test named column retrieval | |
| assert rec["fldTwo"] == inParam[0] | |
| assert rec.fldThree == inParam[1] | |
| assert rec.fldFour == inParam[2] | |
| # test array operation | |
| # note that the fields vv vv vv are out of order | |
| crsr.execute("select fldThree,fldFour,fldTwo from xx_%s" % config.tmp) | |
| recs = crsr.fetchall() | |
| assert recs[1][0] == 103 | |
| assert recs[0][1] == 4 | |
| assert recs[1]["fldFour"] == 104 | |
| assert recs[0, 0] == 3 | |
| assert recs[0, "fldTwo"] == 2 | |
| assert recs[1, 2] == 102 | |
| for i in range(1): | |
| for j in range(2): | |
| assert recs[i][j] == recs[i, j] | |
| def testFormatParamstyle(self): | |
| self.helpForceDropOnTblTemp() | |
| conn = self.getConnection() | |
| conn.paramstyle = "format" # test nonstandard use of paramstyle | |
| crsr = conn.cursor() | |
| tabdef = ( | |
| """ | |
| CREATE TABLE xx_%s ( | |
| fldId integer NOT NULL, | |
| fldData varchar(10), | |
| fldConst varchar(30)) | |
| """ | |
| % config.tmp | |
| ) | |
| crsr.execute(tabdef) | |
| inputs = ["one", "two", "three"] | |
| fldId = 2 | |
| for inParam in inputs: | |
| fldId += 1 | |
| sql = ( | |
| "INSERT INTO xx_" | |
| + config.tmp | |
| + " (fldId,fldConst,fldData) VALUES (%s,'thi%s :may cause? trouble', %s)" | |
| ) | |
| try: | |
| crsr.execute(sql, (fldId, inParam)) | |
| except: | |
| if self.remote: | |
| for message in crsr.messages: | |
| print(message) | |
| else: | |
| conn.printADOerrors() | |
| raise | |
| crsr.execute( | |
| "SELECT fldData, fldConst FROM xx_" + config.tmp + " WHERE %s=fldID", | |
| [fldId], | |
| ) | |
| rec = crsr.fetchone() | |
| self.assertEqual( | |
| rec[0], | |
| inParam, | |
| 'returned value:"%s" != test value:"%s"' % (rec[0], inParam), | |
| ) | |
| self.assertEqual(rec[1], "thi%s :may cause? trouble") | |
| # now try an operation with a "%s" as part of a literal | |
| sel = ( | |
| "insert into xx_" + config.tmp + " (fldId,fldData) VALUES (%s,'four%sfive')" | |
| ) | |
| params = (20,) | |
| crsr.execute(sel, params) | |
| # test the .query implementation | |
| assert "(?," in crsr.query, 'expected:"%s" in "%s"' % ("(?,", crsr.query) | |
| # test the .command attribute | |
| assert crsr.command == sel, 'expected:"%s" but found "%s"' % (sel, crsr.command) | |
| # test the .parameters attribute | |
| if not self.remote: # parameter list will be altered in transit | |
| self.assertEqual(crsr.parameters, params) | |
| # now make sure the data made it | |
| crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=20" % config.tmp) | |
| rec = crsr.fetchone() | |
| self.assertEqual(rec[0], "four%sfive") | |
| def testNamedParamstyle(self): | |
| self.helpForceDropOnTblTemp() | |
| conn = self.getConnection() | |
| crsr = conn.cursor() | |
| crsr.paramstyle = "named" # test nonstandard use of paramstyle | |
| tabdef = ( | |
| """ | |
| CREATE TABLE xx_%s ( | |
| fldId integer NOT NULL, | |
| fldData varchar(10)) | |
| """ | |
| % config.tmp | |
| ) | |
| crsr.execute(tabdef) | |
| inputs = ["four", "five", "six"] | |
| fldId = 10 | |
| for inParam in inputs: | |
| fldId += 1 | |
| try: | |
| crsr.execute( | |
| "INSERT INTO xx_%s (fldId,fldData) VALUES (:Id,:f_Val)" | |
| % config.tmp, | |
| {"f_Val": inParam, "Id": fldId}, | |
| ) | |
| except: | |
| if self.remote: | |
| for message in crsr.messages: | |
| print(message) | |
| else: | |
| conn.printADOerrors() | |
| raise | |
| crsr.execute( | |
| "SELECT fldData FROM xx_%s WHERE fldID=:Id" % config.tmp, {"Id": fldId} | |
| ) | |
| rec = crsr.fetchone() | |
| self.assertEqual( | |
| rec[0], | |
| inParam, | |
| 'returned value:"%s" != test value:"%s"' % (rec[0], inParam), | |
| ) | |
| # now a test with a ":" as part of a literal | |
| crsr.execute( | |
| "insert into xx_%s (fldId,fldData) VALUES (:xyz,'six:five')" % config.tmp, | |
| {"xyz": 30}, | |
| ) | |
| crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp) | |
| rec = crsr.fetchone() | |
| self.assertEqual(rec[0], "six:five") | |
| def testPyformatParamstyle(self): | |
| self.helpForceDropOnTblTemp() | |
| conn = self.getConnection() | |
| crsr = conn.cursor() | |
| crsr.paramstyle = "pyformat" # test nonstandard use of paramstyle | |
| tabdef = ( | |
| """ | |
| CREATE TABLE xx_%s ( | |
| fldId integer NOT NULL, | |
| fldData varchar(10)) | |
| """ | |
| % config.tmp | |
| ) | |
| crsr.execute(tabdef) | |
| inputs = ["four", "five", "six"] | |
| fldId = 10 | |
| for inParam in inputs: | |
| fldId += 1 | |
| try: | |
| crsr.execute( | |
| "INSERT INTO xx_%s (fldId,fldData) VALUES (%%(Id)s,%%(f_Val)s)" | |
| % config.tmp, | |
| {"f_Val": inParam, "Id": fldId}, | |
| ) | |
| except: | |
| if self.remote: | |
| for message in crsr.messages: | |
| print(message) | |
| else: | |
| conn.printADOerrors() | |
| raise | |
| crsr.execute( | |
| "SELECT fldData FROM xx_%s WHERE fldID=%%(Id)s" % config.tmp, | |
| {"Id": fldId}, | |
| ) | |
| rec = crsr.fetchone() | |
| self.assertEqual( | |
| rec[0], | |
| inParam, | |
| 'returned value:"%s" != test value:"%s"' % (rec[0], inParam), | |
| ) | |
| # now a test with a "%" as part of a literal | |
| crsr.execute( | |
| "insert into xx_%s (fldId,fldData) VALUES (%%(xyz)s,'six%%five')" | |
| % config.tmp, | |
| {"xyz": 30}, | |
| ) | |
| crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp) | |
| rec = crsr.fetchone() | |
| self.assertEqual(rec[0], "six%five") | |
| def testAutomaticParamstyle(self): | |
| self.helpForceDropOnTblTemp() | |
| conn = self.getConnection() | |
| conn.paramstyle = "dynamic" # test nonstandard use of paramstyle | |
| crsr = conn.cursor() | |
| tabdef = ( | |
| """ | |
| CREATE TABLE xx_%s ( | |
| fldId integer NOT NULL, | |
| fldData varchar(10), | |
| fldConst varchar(30)) | |
| """ | |
| % config.tmp | |
| ) | |
| crsr.execute(tabdef) | |
| inputs = ["one", "two", "three"] | |
| fldId = 2 | |
| for inParam in inputs: | |
| fldId += 1 | |
| try: | |
| crsr.execute( | |
| "INSERT INTO xx_" | |
| + config.tmp | |
| + " (fldId,fldConst,fldData) VALUES (?,'thi%s :may cause? troub:1e', ?)", | |
| (fldId, inParam), | |
| ) | |
| except: | |
| if self.remote: | |
| for message in crsr.messages: | |
| print(message) | |
| else: | |
| conn.printADOerrors() | |
| raise | |
| trouble = "thi%s :may cause? troub:1e" | |
| crsr.execute( | |
| "SELECT fldData, fldConst FROM xx_" + config.tmp + " WHERE ?=fldID", | |
| [fldId], | |
| ) | |
| rec = crsr.fetchone() | |
| self.assertEqual( | |
| rec[0], | |
| inParam, | |
| 'returned value:"%s" != test value:"%s"' % (rec[0], inParam), | |
| ) | |
| self.assertEqual(rec[1], trouble) | |
| # inputs = [u'four',u'five',u'six'] | |
| fldId = 10 | |
| for inParam in inputs: | |
| fldId += 1 | |
| try: | |
| crsr.execute( | |
| "INSERT INTO xx_%s (fldId,fldData) VALUES (:Id,:f_Val)" | |
| % config.tmp, | |
| {"f_Val": inParam, "Id": fldId}, | |
| ) | |
| except: | |
| if self.remote: | |
| for message in crsr.messages: | |
| print(message) | |
| else: | |
| conn.printADOerrors() | |
| raise | |
| crsr.execute( | |
| "SELECT fldData FROM xx_%s WHERE :Id=fldID" % config.tmp, {"Id": fldId} | |
| ) | |
| rec = crsr.fetchone() | |
| self.assertEqual( | |
| rec[0], | |
| inParam, | |
| 'returned value:"%s" != test value:"%s"' % (rec[0], inParam), | |
| ) | |
| # now a test with a ":" as part of a literal -- and use a prepared query | |
| ppdcmd = ( | |
| "insert into xx_%s (fldId,fldData) VALUES (:xyz,'six:five')" % config.tmp | |
| ) | |
| crsr.prepare(ppdcmd) | |
| crsr.execute(ppdcmd, {"xyz": 30}) | |
| crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp) | |
| rec = crsr.fetchone() | |
| self.assertEqual(rec[0], "six:five") | |
| def testRollBack(self): | |
| conn = self.getConnection() | |
| crsr = conn.cursor() | |
| assert not crsr.connection.autocommit, "Unexpected beginning condition" | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.connection.commit() # commit the first bunch | |
| crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp) | |
| selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp | |
| crsr.execute(selectSql) | |
| rs = crsr.fetchall() | |
| assert len(rs) == 1 | |
| self.conn.rollback() | |
| crsr.execute(selectSql) | |
| assert ( | |
| crsr.fetchone() == None | |
| ), "cursor.fetchone should return None if a query retrieves no rows" | |
| crsr.execute("SELECT fldData from xx_%s" % config.tmp) | |
| rs = crsr.fetchall() | |
| assert len(rs) == 9, "the original records should still be present" | |
| self.helpRollbackTblTemp() | |
| def testCommit(self): | |
| try: | |
| con2 = self.getAnotherConnection() | |
| except NotImplementedError: | |
| return # should be "SKIP" for ACCESS | |
| assert not con2.autocommit, "default should be manual commit" | |
| crsr = con2.cursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp) | |
| con2.commit() | |
| selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp | |
| crsr.execute(selectSql) | |
| rs = crsr.fetchall() | |
| assert len(rs) == 1 | |
| crsr.close() | |
| con2.close() | |
| conn = self.getConnection() | |
| crsr = self.getCursor() | |
| with conn.cursor() as crsr: | |
| crsr.execute(selectSql) | |
| rs = crsr.fetchall() | |
| assert len(rs) == 1 | |
| assert rs[0][0] == 100 | |
| self.helpRollbackTblTemp() | |
| def testAutoRollback(self): | |
| try: | |
| con2 = self.getAnotherConnection() | |
| except NotImplementedError: | |
| return # should be "SKIP" for ACCESS | |
| assert not con2.autocommit, "unexpected beginning condition" | |
| crsr = con2.cursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp) | |
| selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp | |
| crsr.execute(selectSql) | |
| rs = crsr.fetchall() | |
| assert len(rs) == 1 | |
| crsr.close() | |
| con2.close() | |
| crsr = self.getCursor() | |
| try: | |
| crsr.execute( | |
| selectSql | |
| ) # closing the connection should have forced rollback | |
| row = crsr.fetchone() | |
| except api.DatabaseError: | |
| row = None # if the entire table disappeared the rollback was perfect and the test passed | |
| assert row == None, ( | |
| "cursor.fetchone should return None if a query retrieves no rows. Got %s" | |
| % repr(row) | |
| ) | |
| self.helpRollbackTblTemp() | |
| def testAutoCommit(self): | |
| try: | |
| ac_conn = self.getAnotherConnection({"autocommit": True}) | |
| except NotImplementedError: | |
| return # should be "SKIP" for ACCESS | |
| crsr = ac_conn.cursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp) | |
| crsr.close() | |
| with self.getCursor() as crsr: | |
| selectSql = "SELECT fldData from xx_%s" % config.tmp | |
| crsr.execute( | |
| selectSql | |
| ) # closing the connection should _not_ have forced rollback | |
| rs = crsr.fetchall() | |
| assert len(rs) == 10, "all records should still be present" | |
| ac_conn.close() | |
| self.helpRollbackTblTemp() | |
| def testSwitchedAutoCommit(self): | |
| try: | |
| ac_conn = self.getAnotherConnection() | |
| except NotImplementedError: | |
| return # should be "SKIP" for ACCESS | |
| ac_conn.autocommit = True | |
| crsr = ac_conn.cursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp) | |
| crsr.close() | |
| conn = self.getConnection() | |
| ac_conn.close() | |
| with self.getCursor() as crsr: | |
| selectSql = "SELECT fldData from xx_%s" % config.tmp | |
| crsr.execute( | |
| selectSql | |
| ) # closing the connection should _not_ have forced rollback | |
| rs = crsr.fetchall() | |
| assert len(rs) == 10, "all records should still be present" | |
| self.helpRollbackTblTemp() | |
| def testExtendedTypeHandling(self): | |
| class XtendString(str): | |
| pass | |
| class XtendInt(int): | |
| pass | |
| class XtendFloat(float): | |
| pass | |
| xs = XtendString(randomstring(30)) | |
| xi = XtendInt(random.randint(-100, 500)) | |
| xf = XtendFloat(random.random()) | |
| self.helpForceDropOnTblTemp() | |
| conn = self.getConnection() | |
| crsr = conn.cursor() | |
| tabdef = ( | |
| """ | |
| CREATE TABLE xx_%s ( | |
| s VARCHAR(40) NOT NULL, | |
| i INTEGER NOT NULL, | |
| f REAL NOT NULL)""" | |
| % config.tmp | |
| ) | |
| crsr.execute(tabdef) | |
| crsr.execute( | |
| "INSERT INTO xx_%s (s, i, f) VALUES (?, ?, ?)" % config.tmp, (xs, xi, xf) | |
| ) | |
| crsr.close() | |
| conn = self.getConnection() | |
| with self.getCursor() as crsr: | |
| selectSql = "SELECT s, i, f from xx_%s" % config.tmp | |
| crsr.execute( | |
| selectSql | |
| ) # closing the connection should _not_ have forced rollback | |
| row = crsr.fetchone() | |
| self.assertEqual(row.s, xs) | |
| self.assertEqual(row.i, xi) | |
| self.assertAlmostEqual(row.f, xf) | |
| self.helpRollbackTblTemp() | |
| class TestADOwithSQLServer(CommonDBTests): | |
| def setUp(self): | |
| self.conn = config.dbSqlServerconnect( | |
| *config.connStrSQLServer[0], **config.connStrSQLServer[1] | |
| ) | |
| self.conn.timeout = 30 # turn timeout back up | |
| self.engine = "MSSQL" | |
| self.db = config.dbSqlServerconnect | |
| self.remote = config.connStrSQLServer[2] | |
| def tearDown(self): | |
| try: | |
| self.conn.rollback() | |
| except: | |
| pass | |
| try: | |
| self.conn.close() | |
| except: | |
| pass | |
| self.conn = None | |
| def getConnection(self): | |
| return self.conn | |
| def getAnotherConnection(self, addkeys=None): | |
| keys = dict(config.connStrSQLServer[1]) | |
| if addkeys: | |
| keys.update(addkeys) | |
| return config.dbSqlServerconnect(*config.connStrSQLServer[0], **keys) | |
| def testVariableReturningStoredProcedure(self): | |
| crsr = self.conn.cursor() | |
| spdef = """ | |
| CREATE PROCEDURE sp_DeleteMeOnlyForTesting | |
| @theInput varchar(50), | |
| @theOtherInput varchar(50), | |
| @theOutput varchar(100) OUTPUT | |
| AS | |
| SET @theOutput=@theInput+@theOtherInput | |
| """ | |
| try: | |
| crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting") | |
| self.conn.commit() | |
| except: # Make sure it is empty | |
| pass | |
| crsr.execute(spdef) | |
| retvalues = crsr.callproc( | |
| "sp_DeleteMeOnlyForTesting", ("Dodsworth", "Anne", " ") | |
| ) | |
| assert retvalues[0] == "Dodsworth", '%s is not "Dodsworth"' % repr(retvalues[0]) | |
| assert retvalues[1] == "Anne", '%s is not "Anne"' % repr(retvalues[1]) | |
| assert retvalues[2] == "DodsworthAnne", '%s is not "DodsworthAnne"' % repr( | |
| retvalues[2] | |
| ) | |
| self.conn.rollback() | |
| def testMultipleSetReturn(self): | |
| crsr = self.getCursor() | |
| self.helpCreateAndPopulateTableTemp(crsr) | |
| spdef = """ | |
| CREATE PROCEDURE sp_DeleteMe_OnlyForTesting | |
| AS | |
| SELECT fldData FROM xx_%s ORDER BY fldData ASC | |
| SELECT fldData From xx_%s where fldData = -9999 | |
| SELECT fldData FROM xx_%s ORDER BY fldData DESC | |
| """ % ( | |
| config.tmp, | |
| config.tmp, | |
| config.tmp, | |
| ) | |
| try: | |
| crsr.execute("DROP PROCEDURE sp_DeleteMe_OnlyForTesting") | |
| self.conn.commit() | |
| except: # Make sure it is empty | |
| pass | |
| crsr.execute(spdef) | |
| retvalues = crsr.callproc("sp_DeleteMe_OnlyForTesting") | |
| row = crsr.fetchone() | |
| self.assertEqual(row[0], 0) | |
| assert crsr.nextset() == True, "Operation should succeed" | |
| assert not crsr.fetchall(), "Should be an empty second set" | |
| assert crsr.nextset() == True, "third set should be present" | |
| rowdesc = crsr.fetchall() | |
| self.assertEqual(rowdesc[0][0], 8) | |
| assert crsr.nextset() == None, "No more return sets, should return None" | |
| self.helpRollbackTblTemp() | |
| def testDatetimeProcedureParameter(self): | |
| crsr = self.conn.cursor() | |
| spdef = """ | |
| CREATE PROCEDURE sp_DeleteMeOnlyForTesting | |
| @theInput DATETIME, | |
| @theOtherInput varchar(50), | |
| @theOutput varchar(100) OUTPUT | |
| AS | |
| SET @theOutput = CONVERT(CHARACTER(20), @theInput, 0) + @theOtherInput | |
| """ | |
| try: | |
| crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting") | |
| self.conn.commit() | |
| except: # Make sure it is empty | |
| pass | |
| crsr.execute(spdef) | |
| result = crsr.callproc( | |
| "sp_DeleteMeOnlyForTesting", | |
| [adodbapi.Timestamp(2014, 12, 25, 0, 1, 0), "Beep", " " * 30], | |
| ) | |
| assert result[2] == "Dec 25 2014 12:01AM Beep", 'value was="%s"' % result[2] | |
| self.conn.rollback() | |
| def testIncorrectStoredProcedureParameter(self): | |
| crsr = self.conn.cursor() | |
| spdef = """ | |
| CREATE PROCEDURE sp_DeleteMeOnlyForTesting | |
| @theInput DATETIME, | |
| @theOtherInput varchar(50), | |
| @theOutput varchar(100) OUTPUT | |
| AS | |
| SET @theOutput = CONVERT(CHARACTER(20), @theInput) + @theOtherInput | |
| """ | |
| try: | |
| crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting") | |
| self.conn.commit() | |
| except: # Make sure it is empty | |
| pass | |
| crsr.execute(spdef) | |
| # calling the sproc with a string for the first parameter where a DateTime is expected | |
| result = tryconnection.try_operation_with_expected_exception( | |
| (api.DataError, api.DatabaseError), | |
| crsr.callproc, | |
| ["sp_DeleteMeOnlyForTesting"], | |
| {"parameters": ["this is wrong", "Anne", "not Alice"]}, | |
| ) | |
| if result[0]: # the expected exception was raised | |
| assert "@theInput" in str(result[1]) or "DatabaseError" in str( | |
| result | |
| ), "Identifies the wrong erroneous parameter" | |
| else: | |
| assert result[0], result[1] # incorrect or no exception | |
| self.conn.rollback() | |
| class TestADOwithAccessDB(CommonDBTests): | |
| def setUp(self): | |
| self.conn = config.dbAccessconnect( | |
| *config.connStrAccess[0], **config.connStrAccess[1] | |
| ) | |
| self.conn.timeout = 30 # turn timeout back up | |
| self.engine = "ACCESS" | |
| self.db = config.dbAccessconnect | |
| self.remote = config.connStrAccess[2] | |
| def tearDown(self): | |
| try: | |
| self.conn.rollback() | |
| except: | |
| pass | |
| try: | |
| self.conn.close() | |
| except: | |
| pass | |
| self.conn = None | |
| def getConnection(self): | |
| return self.conn | |
| def getAnotherConnection(self, addkeys=None): | |
| raise NotImplementedError("Jet cannot use a second connection to the database") | |
| def testOkConnect(self): | |
| c = self.db(*config.connStrAccess[0], **config.connStrAccess[1]) | |
| assert c != None | |
| c.close() | |
| class TestADOwithMySql(CommonDBTests): | |
| def setUp(self): | |
| self.conn = config.dbMySqlconnect( | |
| *config.connStrMySql[0], **config.connStrMySql[1] | |
| ) | |
| self.conn.timeout = 30 # turn timeout back up | |
| self.engine = "MySQL" | |
| self.db = config.dbMySqlconnect | |
| self.remote = config.connStrMySql[2] | |
| def tearDown(self): | |
| try: | |
| self.conn.rollback() | |
| except: | |
| pass | |
| try: | |
| self.conn.close() | |
| except: | |
| pass | |
| self.conn = None | |
| def getConnection(self): | |
| return self.conn | |
| def getAnotherConnection(self, addkeys=None): | |
| keys = dict(config.connStrMySql[1]) | |
| if addkeys: | |
| keys.update(addkeys) | |
| return config.dbMySqlconnect(*config.connStrMySql[0], **keys) | |
| def testOkConnect(self): | |
| c = self.db(*config.connStrMySql[0], **config.connStrMySql[1]) | |
| assert c != None | |
| # def testStoredProcedure(self): | |
| # crsr=self.conn.cursor() | |
| # try: | |
| # crsr.execute("DROP PROCEDURE DeleteMeOnlyForTesting") | |
| # self.conn.commit() | |
| # except: #Make sure it is empty | |
| # pass | |
| # spdef= """ | |
| # DELIMITER $$ | |
| # CREATE PROCEDURE DeleteMeOnlyForTesting (onein CHAR(10), twoin CHAR(10), OUT theout CHAR(20)) | |
| # DETERMINISTIC | |
| # BEGIN | |
| # SET theout = onein //|| twoin; | |
| # /* (SELECT 'a small string' as result; */ | |
| # END $$ | |
| # """ | |
| # | |
| # crsr.execute(spdef) | |
| # | |
| # retvalues=crsr.callproc('DeleteMeOnlyForTesting',('Dodsworth','Anne',' ')) | |
| # print 'return value (mysql)=',repr(crsr.returnValue) ### | |
| # assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0]) | |
| # assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1]) | |
| # assert retvalues[2]=='DodsworthAnne','%s is not "DodsworthAnne"'%repr(retvalues[2]) | |
| # | |
| # try: | |
| # crsr.execute("DROP PROCEDURE, DeleteMeOnlyForTesting") | |
| # self.conn.commit() | |
| # except: #Make sure it is empty | |
| # pass | |
| class TestADOwithPostgres(CommonDBTests): | |
| def setUp(self): | |
| self.conn = config.dbPostgresConnect( | |
| *config.connStrPostgres[0], **config.connStrPostgres[1] | |
| ) | |
| self.conn.timeout = 30 # turn timeout back up | |
| self.engine = "PostgreSQL" | |
| self.db = config.dbPostgresConnect | |
| self.remote = config.connStrPostgres[2] | |
| def tearDown(self): | |
| try: | |
| self.conn.rollback() | |
| except: | |
| pass | |
| try: | |
| self.conn.close() | |
| except: | |
| pass | |
| self.conn = None | |
| def getConnection(self): | |
| return self.conn | |
| def getAnotherConnection(self, addkeys=None): | |
| keys = dict(config.connStrPostgres[1]) | |
| if addkeys: | |
| keys.update(addkeys) | |
| return config.dbPostgresConnect(*config.connStrPostgres[0], **keys) | |
| def testOkConnect(self): | |
| c = self.db(*config.connStrPostgres[0], **config.connStrPostgres[1]) | |
| assert c != None | |
| # def testStoredProcedure(self): | |
| # crsr=self.conn.cursor() | |
| # spdef= """ | |
| # CREATE OR REPLACE FUNCTION DeleteMeOnlyForTesting (text, text) | |
| # RETURNS text AS $funk$ | |
| # BEGIN | |
| # RETURN $1 || $2; | |
| # END; | |
| # $funk$ | |
| # LANGUAGE SQL; | |
| # """ | |
| # | |
| # crsr.execute(spdef) | |
| # retvalues = crsr.callproc('DeleteMeOnlyForTesting',('Dodsworth','Anne',' ')) | |
| # ### print 'return value (pg)=',repr(crsr.returnValue) ### | |
| # assert retvalues[0]=='Dodsworth', '%s is not "Dodsworth"'%repr(retvalues[0]) | |
| # assert retvalues[1]=='Anne','%s is not "Anne"'%repr(retvalues[1]) | |
| # assert retvalues[2]=='Dodsworth Anne','%s is not "Dodsworth Anne"'%repr(retvalues[2]) | |
| # self.conn.rollback() | |
| # try: | |
| # crsr.execute("DROP PROCEDURE, DeleteMeOnlyForTesting") | |
| # self.conn.commit() | |
| # except: #Make sure it is empty | |
| # pass | |
| class TimeConverterInterfaceTest(unittest.TestCase): | |
| def testIDate(self): | |
| assert self.tc.Date(1990, 2, 2) | |
| def testITime(self): | |
| assert self.tc.Time(13, 2, 2) | |
| def testITimestamp(self): | |
| assert self.tc.Timestamp(1990, 2, 2, 13, 2, 1) | |
| def testIDateObjectFromCOMDate(self): | |
| assert self.tc.DateObjectFromCOMDate(37435.7604282) | |
| def testICOMDate(self): | |
| assert hasattr(self.tc, "COMDate") | |
| def testExactDate(self): | |
| d = self.tc.Date(1994, 11, 15) | |
| comDate = self.tc.COMDate(d) | |
| correct = 34653.0 | |
| assert comDate == correct, comDate | |
| def testExactTimestamp(self): | |
| d = self.tc.Timestamp(1994, 11, 15, 12, 0, 0) | |
| comDate = self.tc.COMDate(d) | |
| correct = 34653.5 | |
| self.assertEqual(comDate, correct) | |
| d = self.tc.Timestamp(2003, 5, 6, 14, 15, 17) | |
| comDate = self.tc.COMDate(d) | |
| correct = 37747.593946759262 | |
| self.assertEqual(comDate, correct) | |
| def testIsoFormat(self): | |
| d = self.tc.Timestamp(1994, 11, 15, 12, 3, 10) | |
| iso = self.tc.DateObjectToIsoFormatString(d) | |
| self.assertEqual(str(iso[:19]), "1994-11-15 12:03:10") | |
| dt = self.tc.Date(2003, 5, 2) | |
| iso = self.tc.DateObjectToIsoFormatString(dt) | |
| self.assertEqual(str(iso[:10]), "2003-05-02") | |
| if config.doMxDateTimeTest: | |
| import mx.DateTime | |
| class TestMXDateTimeConverter(TimeConverterInterfaceTest): | |
| def setUp(self): | |
| self.tc = api.mxDateTimeConverter() | |
| def testCOMDate(self): | |
| t = mx.DateTime.DateTime(2002, 6, 28, 18, 15, 2) | |
| cmd = self.tc.COMDate(t) | |
| assert cmd == t.COMDate() | |
| def testDateObjectFromCOMDate(self): | |
| cmd = self.tc.DateObjectFromCOMDate(37435.7604282) | |
| t = mx.DateTime.DateTime(2002, 6, 28, 18, 15, 0) | |
| t2 = mx.DateTime.DateTime(2002, 6, 28, 18, 15, 2) | |
| assert t2 > cmd > t | |
| def testDate(self): | |
| assert mx.DateTime.Date(1980, 11, 4) == self.tc.Date(1980, 11, 4) | |
| def testTime(self): | |
| assert mx.DateTime.Time(13, 11, 4) == self.tc.Time(13, 11, 4) | |
| def testTimestamp(self): | |
| t = mx.DateTime.DateTime(2002, 6, 28, 18, 15, 1) | |
| obj = self.tc.Timestamp(2002, 6, 28, 18, 15, 1) | |
| assert t == obj | |
| import time | |
| class TestPythonTimeConverter(TimeConverterInterfaceTest): | |
| def setUp(self): | |
| self.tc = api.pythonTimeConverter() | |
| def testCOMDate(self): | |
| mk = time.mktime((2002, 6, 28, 18, 15, 1, 4, 31 + 28 + 31 + 30 + 31 + 28, -1)) | |
| t = time.localtime(mk) | |
| # Fri, 28 Jun 2002 18:15:01 +0000 | |
| cmd = self.tc.COMDate(t) | |
| assert abs(cmd - 37435.7604282) < 1.0 / 24, "%f more than an hour wrong" % cmd | |
| def testDateObjectFromCOMDate(self): | |
| cmd = self.tc.DateObjectFromCOMDate(37435.7604282) | |
| t1 = time.gmtime( | |
| time.mktime((2002, 6, 28, 0, 14, 1, 4, 31 + 28 + 31 + 30 + 31 + 28, -1)) | |
| ) | |
| # there are errors in the implementation of gmtime which we ignore | |
| t2 = time.gmtime( | |
| time.mktime((2002, 6, 29, 12, 14, 2, 4, 31 + 28 + 31 + 30 + 31 + 28, -1)) | |
| ) | |
| assert t1 < cmd < t2, '"%s" should be about 2002-6-28 12:15:01' % repr(cmd) | |
| def testDate(self): | |
| t1 = time.mktime((2002, 6, 28, 18, 15, 1, 4, 31 + 28 + 31 + 30 + 31 + 30, 0)) | |
| t2 = time.mktime((2002, 6, 30, 18, 15, 1, 4, 31 + 28 + 31 + 30 + 31 + 28, 0)) | |
| obj = self.tc.Date(2002, 6, 29) | |
| assert t1 < time.mktime(obj) < t2, obj | |
| def testTime(self): | |
| self.assertEqual( | |
| self.tc.Time(18, 15, 2), time.gmtime(18 * 60 * 60 + 15 * 60 + 2) | |
| ) | |
| def testTimestamp(self): | |
| t1 = time.localtime( | |
| time.mktime((2002, 6, 28, 18, 14, 1, 4, 31 + 28 + 31 + 30 + 31 + 28, -1)) | |
| ) | |
| t2 = time.localtime( | |
| time.mktime((2002, 6, 28, 18, 16, 1, 4, 31 + 28 + 31 + 30 + 31 + 28, -1)) | |
| ) | |
| obj = self.tc.Timestamp(2002, 6, 28, 18, 15, 2) | |
| assert t1 < obj < t2, obj | |
| class TestPythonDateTimeConverter(TimeConverterInterfaceTest): | |
| def setUp(self): | |
| self.tc = api.pythonDateTimeConverter() | |
| def testCOMDate(self): | |
| t = datetime.datetime(2002, 6, 28, 18, 15, 1) | |
| # Fri, 28 Jun 2002 18:15:01 +0000 | |
| cmd = self.tc.COMDate(t) | |
| assert abs(cmd - 37435.7604282) < 1.0 / 24, "more than an hour wrong" | |
| def testDateObjectFromCOMDate(self): | |
| cmd = self.tc.DateObjectFromCOMDate(37435.7604282) | |
| t1 = datetime.datetime(2002, 6, 28, 18, 14, 1) | |
| t2 = datetime.datetime(2002, 6, 28, 18, 16, 1) | |
| assert t1 < cmd < t2, cmd | |
| tx = datetime.datetime( | |
| 2002, 6, 28, 18, 14, 1, 900000 | |
| ) # testing that microseconds don't become milliseconds | |
| c1 = self.tc.DateObjectFromCOMDate(self.tc.COMDate(tx)) | |
| assert t1 < c1 < t2, c1 | |
| def testDate(self): | |
| t1 = datetime.date(2002, 6, 28) | |
| t2 = datetime.date(2002, 6, 30) | |
| obj = self.tc.Date(2002, 6, 29) | |
| assert t1 < obj < t2, obj | |
| def testTime(self): | |
| self.assertEqual(self.tc.Time(18, 15, 2).isoformat()[:8], "18:15:02") | |
| def testTimestamp(self): | |
| t1 = datetime.datetime(2002, 6, 28, 18, 14, 1) | |
| t2 = datetime.datetime(2002, 6, 28, 18, 16, 1) | |
| obj = self.tc.Timestamp(2002, 6, 28, 18, 15, 2) | |
| assert t1 < obj < t2, obj | |
| suites = [] | |
| suites.append(unittest.makeSuite(TestPythonDateTimeConverter, "test")) | |
| if config.doMxDateTimeTest: | |
| suites.append(unittest.makeSuite(TestMXDateTimeConverter, "test")) | |
| if config.doTimeTest: | |
| suites.append(unittest.makeSuite(TestPythonTimeConverter, "test")) | |
| if config.doAccessTest: | |
| suites.append(unittest.makeSuite(TestADOwithAccessDB, "test")) | |
| if config.doSqlServerTest: | |
| suites.append(unittest.makeSuite(TestADOwithSQLServer, "test")) | |
| if config.doMySqlTest: | |
| suites.append(unittest.makeSuite(TestADOwithMySql, "test")) | |
| if config.doPostgresTest: | |
| suites.append(unittest.makeSuite(TestADOwithPostgres, "test")) | |
| class cleanup_manager(object): | |
| def __enter__(self): | |
| pass | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| config.cleanup(config.testfolder, config.mdb_name) | |
| suite = unittest.TestSuite(suites) | |
| if __name__ == "__main__": | |
| mysuite = copy.deepcopy(suite) | |
| with cleanup_manager(): | |
| defaultDateConverter = adodbapi.dateconverter | |
| print(__doc__) | |
| print("Default Date Converter is %s" % (defaultDateConverter,)) | |
| dateconverter = defaultDateConverter | |
| tag = "datetime" | |
| unittest.TextTestRunner().run(mysuite) | |
| if config.iterateOverTimeTests: | |
| for test, dateconverter, tag in ( | |
| (config.doTimeTest, api.pythonTimeConverter, "pythontime"), | |
| (config.doMxDateTimeTest, api.mxDateTimeConverter, "mx"), | |
| ): | |
| if test: | |
| mysuite = copy.deepcopy( | |
| suite | |
| ) # work around a side effect of unittest.TextTestRunner | |
| adodbapi.adodbapi.dateconverter = dateconverter() | |
| print("Changed dateconverter to ") | |
| print(adodbapi.adodbapi.dateconverter) | |
| unittest.TextTestRunner().run(mysuite) | |