import datetime import time import warnings import sys import pymysql from pymysql import cursors from pymysql._compat import text_type from pymysql.tests import base import unittest2 try: import imp reload = imp.reload except AttributeError: pass __all__ = ["TestOldIssues", "TestNewIssues", "TestGitHubIssues"] class TestOldIssues(base.PyMySQLTestCase): def test_issue_3(self): """ undefined methods datetime_or_None, date_or_None """ conn = self.connections[0] c = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue3") c.execute("create table issue3 (d date, t time, dt datetime, ts timestamp)") try: c.execute("insert into issue3 (d, t, dt, ts) values (%s,%s,%s,%s)", (None, None, None, None)) c.execute("select d from issue3") self.assertEqual(None, c.fetchone()[0]) c.execute("select t from issue3") self.assertEqual(None, c.fetchone()[0]) c.execute("select dt from issue3") self.assertEqual(None, c.fetchone()[0]) c.execute("select ts from issue3") self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime)) finally: c.execute("drop table issue3") def test_issue_4(self): """ can't retrieve TIMESTAMP fields """ conn = self.connections[0] c = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue4") c.execute("create table issue4 (ts timestamp)") try: c.execute("insert into issue4 (ts) values (now())") c.execute("select ts from issue4") self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime)) finally: c.execute("drop table issue4") def test_issue_5(self): """ query on information_schema.tables fails """ con = self.connections[0] cur = con.cursor() cur.execute("select * from information_schema.tables") def test_issue_6(self): """ exception: TypeError: ord() expected a character, but string of length 0 found """ # ToDo: this test requires access to db 'mysql'. kwargs = self.databases[0].copy() kwargs['db'] = "mysql" conn = pymysql.connect(**kwargs) c = conn.cursor() c.execute("select * from user") conn.close() def test_issue_8(self): """ Primary Key and Index error when selecting data """ conn = self.connections[0] c = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists test") c.execute("""CREATE TABLE `test` (`station` int(10) NOT NULL DEFAULT '0', `dh` datetime NOT NULL DEFAULT '2015-01-01 00:00:00', `echeance` int(1) NOT NULL DEFAULT '0', `me` double DEFAULT NULL, `mo` double DEFAULT NULL, PRIMARY KEY (`station`,`dh`,`echeance`)) ENGINE=MyISAM DEFAULT CHARSET=latin1;""") try: self.assertEqual(0, c.execute("SELECT * FROM test")) c.execute("ALTER TABLE `test` ADD INDEX `idx_station` (`station`)") self.assertEqual(0, c.execute("SELECT * FROM test")) finally: c.execute("drop table test") def test_issue_9(self): """ sets DeprecationWarning in Python 2.6 """ try: reload(pymysql) except DeprecationWarning: self.fail() def test_issue_13(self): """ can't handle large result fields """ conn = self.connections[0] cur = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") cur.execute("drop table if exists issue13") try: cur.execute("create table issue13 (t text)") # ticket says 18k size = 18*1024 cur.execute("insert into issue13 (t) values (%s)", ("x" * size,)) cur.execute("select t from issue13") # use assertTrue so that obscenely huge error messages don't print r = cur.fetchone()[0] self.assertTrue("x" * size == r) finally: cur.execute("drop table issue13") def test_issue_15(self): """ query should be expanded before perform character encoding """ conn = self.connections[0] c = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue15") c.execute("create table issue15 (t varchar(32))") try: c.execute("insert into issue15 (t) values (%s)", (u'\xe4\xf6\xfc',)) c.execute("select t from issue15") self.assertEqual(u'\xe4\xf6\xfc', c.fetchone()[0]) finally: c.execute("drop table issue15") def test_issue_16(self): """ Patch for string and tuple escaping """ conn = self.connections[0] c = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue16") c.execute("create table issue16 (name varchar(32) primary key, email varchar(32))") try: c.execute("insert into issue16 (name, email) values ('pete', 'floydophone')") c.execute("select email from issue16 where name=%s", ("pete",)) self.assertEqual("floydophone", c.fetchone()[0]) finally: c.execute("drop table issue16") @unittest2.skip("test_issue_17() requires a custom, legacy MySQL configuration and will not be run.") def test_issue_17(self): """could not connect mysql use passwod""" conn = self.connections[0] host = self.databases[0]["host"] db = self.databases[0]["db"] c = conn.cursor() # grant access to a table to a user with a password try: with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue17") c.execute("create table issue17 (x varchar(32) primary key)") c.execute("insert into issue17 (x) values ('hello, world!')") c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db) conn.commit() conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db) c2 = conn2.cursor() c2.execute("select x from issue17") self.assertEqual("hello, world!", c2.fetchone()[0]) finally: c.execute("drop table issue17") class TestNewIssues(base.PyMySQLTestCase): def test_issue_34(self): try: pymysql.connect(host="localhost", port=1237, user="root") self.fail() except pymysql.OperationalError as e: self.assertEqual(2003, e.args[0]) except Exception: self.fail() def test_issue_33(self): conn = pymysql.connect(charset="utf8", **self.databases[0]) self.safe_create_table(conn, u'hei\xdfe', u'create table hei\xdfe (name varchar(32))') c = conn.cursor() c.execute(u"insert into hei\xdfe (name) values ('Pi\xdfata')") c.execute(u"select name from hei\xdfe") self.assertEqual(u"Pi\xdfata", c.fetchone()[0]) @unittest2.skip("This test requires manual intervention") def test_issue_35(self): conn = self.connections[0] c = conn.cursor() print("sudo killall -9 mysqld within the next 10 seconds") try: c.execute("select sleep(10)") self.fail() except pymysql.OperationalError as e: self.assertEqual(2013, e.args[0]) def test_issue_36(self): # connection 0 is super user, connection 1 isn't conn = self.connections[1] c = conn.cursor() c.execute("show processlist") kill_id = None for row in c.fetchall(): id = row[0] info = row[7] if info == "show processlist": kill_id = id break self.assertEqual(kill_id, conn.thread_id()) # now nuke the connection self.connections[0].kill(kill_id) # make sure this connection has broken try: c.execute("show tables") self.fail() except Exception: pass c.close() conn.close() # check the process list from the other connection try: # Wait since Travis-CI sometimes fail this test. time.sleep(0.1) c = self.connections[0].cursor() c.execute("show processlist") ids = [row[0] for row in c.fetchall()] self.assertFalse(kill_id in ids) finally: del self.connections[1] def test_issue_37(self): conn = self.connections[0] c = conn.cursor() self.assertEqual(1, c.execute("SELECT @foo")) self.assertEqual((None,), c.fetchone()) self.assertEqual(0, c.execute("SET @foo = 'bar'")) c.execute("set @foo = 'bar'") def test_issue_38(self): conn = self.connections[0] c = conn.cursor() datum = "a" * 1024 * 1023 # reduced size for most default mysql installs try: with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue38") c.execute("create table issue38 (id integer, data mediumblob)") c.execute("insert into issue38 values (1, %s)", (datum,)) finally: c.execute("drop table issue38") def disabled_test_issue_54(self): conn = self.connections[0] c = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue54") big_sql = "select * from issue54 where " big_sql += " and ".join("%d=%d" % (i,i) for i in range(0, 100000)) try: c.execute("create table issue54 (id integer primary key)") c.execute("insert into issue54 (id) values (7)") c.execute(big_sql) self.assertEqual(7, c.fetchone()[0]) finally: c.execute("drop table issue54") class TestGitHubIssues(base.PyMySQLTestCase): def test_issue_66(self): """ 'Connection' object has no attribute 'insert_id' """ conn = self.connections[0] c = conn.cursor() self.assertEqual(0, conn.insert_id()) try: with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists issue66") c.execute("create table issue66 (id integer primary key auto_increment, x integer)") c.execute("insert into issue66 (x) values (1)") c.execute("insert into issue66 (x) values (1)") self.assertEqual(2, conn.insert_id()) finally: c.execute("drop table issue66") def test_issue_79(self): """ Duplicate field overwrites the previous one in the result of DictCursor """ conn = self.connections[0] c = conn.cursor(pymysql.cursors.DictCursor) with warnings.catch_warnings(): warnings.filterwarnings("ignore") c.execute("drop table if exists a") c.execute("drop table if exists b") c.execute("""CREATE TABLE a (id int, value int)""") c.execute("""CREATE TABLE b (id int, value int)""") a=(1,11) b=(1,22) try: c.execute("insert into a values (%s, %s)", a) c.execute("insert into b values (%s, %s)", b) c.execute("SELECT * FROM a inner join b on a.id = b.id") r = c.fetchall()[0] self.assertEqual(r['id'], 1) self.assertEqual(r['value'], 11) self.assertEqual(r['b.value'], 22) finally: c.execute("drop table a") c.execute("drop table b") def test_issue_95(self): """ Leftover trailing OK packet for "CALL my_sp" queries """ conn = self.connections[0] cur = conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore") cur.execute("DROP PROCEDURE IF EXISTS `foo`") cur.execute("""CREATE PROCEDURE `foo` () BEGIN SELECT 1; END""") try: cur.execute("""CALL foo()""") cur.execute("""SELECT 1""") self.assertEqual(cur.fetchone()[0], 1) finally: with warnings.catch_warnings(): warnings.filterwarnings("ignore") cur.execute("DROP PROCEDURE IF EXISTS `foo`") def test_issue_114(self): """ autocommit is not set after reconnecting with ping() """ conn = pymysql.connect(charset="utf8", **self.databases[0]) conn.autocommit(False) c = conn.cursor() c.execute("""select @@autocommit;""") self.assertFalse(c.fetchone()[0]) conn.close() conn.ping() c.execute("""select @@autocommit;""") self.assertFalse(c.fetchone()[0]) conn.close() # Ensure autocommit() is still working conn = pymysql.connect(charset="utf8", **self.databases[0]) c = conn.cursor() c.execute("""select @@autocommit;""") self.assertFalse(c.fetchone()[0]) conn.close() conn.ping() conn.autocommit(True) c.execute("""select @@autocommit;""") self.assertTrue(c.fetchone()[0]) conn.close() def test_issue_175(self): """ The number of fields returned by server is read in wrong way """ conn = self.connections[0] cur = conn.cursor() for length in (200, 300): columns = ', '.join('c{0} integer'.format(i) for i in range(length)) sql = 'create table test_field_count ({0})'.format(columns) try: cur.execute(sql) cur.execute('select * from test_field_count') assert len(cur.description) == length finally: with warnings.catch_warnings(): warnings.filterwarnings("ignore") cur.execute('drop table if exists test_field_count') def test_issue_321(self): """ Test iterable as query argument. """ conn = pymysql.connect(charset="utf8", **self.databases[0]) self.safe_create_table( conn, "issue321", "create table issue321 (value_1 varchar(1), value_2 varchar(1))") sql_insert = "insert into issue321 (value_1, value_2) values (%s, %s)" sql_dict_insert = ("insert into issue321 (value_1, value_2) " "values (%(value_1)s, %(value_2)s)") sql_select = ("select * from issue321 where " "value_1 in %s and value_2=%s") data = [ [(u"a", ), u"\u0430"], [[u"b"], u"\u0430"], {"value_1": [[u"c"]], "value_2": u"\u0430"} ] cur = conn.cursor() self.assertEqual(cur.execute(sql_insert, data[0]), 1) self.assertEqual(cur.execute(sql_insert, data[1]), 1) self.assertEqual(cur.execute(sql_dict_insert, data[2]), 1) self.assertEqual( cur.execute(sql_select, [(u"a", u"b", u"c"), u"\u0430"]), 3) self.assertEqual(cur.fetchone(), (u"a", u"\u0430")) self.assertEqual(cur.fetchone(), (u"b", u"\u0430")) self.assertEqual(cur.fetchone(), (u"c", u"\u0430")) def test_issue_364(self): """ Test mixed unicode/binary arguments in executemany. """ conn = pymysql.connect(charset="utf8", **self.databases[0]) self.safe_create_table( conn, "issue364", "create table issue364 (value_1 binary(3), value_2 varchar(3)) " "engine=InnoDB default charset=utf8") sql = "insert into issue364 (value_1, value_2) values (%s, %s)" usql = u"insert into issue364 (value_1, value_2) values (%s, %s)" values = [pymysql.Binary(b"\x00\xff\x00"), u"\xe4\xf6\xfc"] # test single insert and select cur = conn.cursor() cur.execute(sql, args=values) cur.execute("select * from issue364") self.assertEqual(cur.fetchone(), tuple(values)) # test single insert unicode query cur.execute(usql, args=values) # test multi insert and select cur.executemany(sql, args=(values, values, values)) cur.execute("select * from issue364") for row in cur.fetchall(): self.assertEqual(row, tuple(values)) # test multi insert with unicode query cur.executemany(usql, args=(values, values, values)) def test_issue_363(self): """ Test binary / geometry types. """ conn = pymysql.connect(charset="utf8", **self.databases[0]) self.safe_create_table( conn, "issue363", "CREATE TABLE issue363 ( " "id INTEGER PRIMARY KEY, geom LINESTRING NOT NULL, " "SPATIAL KEY geom (geom)) " "ENGINE=MyISAM default charset=utf8") cur = conn.cursor() query = ("INSERT INTO issue363 (id, geom) VALUES" "(1998, GeomFromText('LINESTRING(1.1 1.1,2.2 2.2)'))") # From MySQL 5.7, ST_GeomFromText is added and GeomFromText is deprecated. if self.mysql_server_is(conn, (5, 7, 0)): with self.assertWarns(pymysql.err.Warning) as cm: cur.execute(query) else: cur.execute(query) # select WKT query = "SELECT AsText(geom) FROM issue363" if self.mysql_server_is(conn, (5, 7, 0)): with self.assertWarns(pymysql.err.Warning) as cm: cur.execute(query) else: cur.execute(query) row = cur.fetchone() self.assertEqual(row, ("LINESTRING(1.1 1.1,2.2 2.2)", )) # select WKB query = "SELECT AsBinary(geom) FROM issue363" if self.mysql_server_is(conn, (5, 7, 0)): with self.assertWarns(pymysql.err.Warning) as cm: cur.execute(query) else: cur.execute(query) row = cur.fetchone() self.assertEqual(row, (b"\x01\x02\x00\x00\x00\x02\x00\x00\x00" b"\x9a\x99\x99\x99\x99\x99\xf1?" b"\x9a\x99\x99\x99\x99\x99\xf1?" b"\x9a\x99\x99\x99\x99\x99\x01@" b"\x9a\x99\x99\x99\x99\x99\x01@", )) # select internal binary cur.execute("SELECT geom FROM issue363") row = cur.fetchone() # don't assert the exact internal binary value, as it could # vary across implementations self.assertTrue(isinstance(row[0], bytes)) def test_issue_491(self): """ Test warning propagation """ conn = pymysql.connect(charset="utf8", **self.databases[0]) with warnings.catch_warnings(): # Ignore all warnings other than pymysql generated ones warnings.simplefilter("ignore") warnings.simplefilter("error", category=pymysql.Warning) # verify for both buffered and unbuffered cursor types for cursor_class in (cursors.Cursor, cursors.SSCursor): c = conn.cursor(cursor_class) try: c.execute("SELECT CAST('124b' AS SIGNED)") c.fetchall() except pymysql.Warning as e: # Warnings should have errorcode and string message, just like exceptions self.assertEqual(len(e.args), 2) self.assertEqual(e.args[0], 1292) self.assertTrue(isinstance(e.args[1], text_type)) else: self.fail("Should raise Warning") finally: c.close()