# coding: utf-8 import datetime import json import time import warnings from unittest2 import SkipTest from pymysql import util import pymysql.cursors from pymysql.tests import base from pymysql.err import ProgrammingError __all__ = ["TestConversion", "TestCursor", "TestBulkInserts"] class TestConversion(base.PyMySQLTestCase): def test_datatypes(self): """ test every data type """ conn = self.connections[0] c = conn.cursor() c.execute("create table test_datatypes (b bit, i int, l bigint, f real, s varchar(32), u varchar(32), bb blob, d date, dt datetime, ts timestamp, td time, t time, st datetime)") try: # insert values v = (True, -3, 123456789012, 5.7, "hello'\" world", u"Espa\xc3\xb1ol", "binary\x00data".encode(conn.charset), datetime.date(1988,2,2), datetime.datetime(2014, 5, 15, 7, 45, 57), datetime.timedelta(5,6), datetime.time(16,32), time.localtime()) c.execute("insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", v) c.execute("select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes") r = c.fetchone() self.assertEqual(util.int2byte(1), r[0]) self.assertEqual(v[1:10], r[1:10]) self.assertEqual(datetime.timedelta(0, 60 * (v[10].hour * 60 + v[10].minute)), r[10]) self.assertEqual(datetime.datetime(*v[-1][:6]), r[-1]) c.execute("delete from test_datatypes") # check nulls c.execute("insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", [None] * 12) c.execute("select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes") r = c.fetchone() self.assertEqual(tuple([None] * 12), r) c.execute("delete from test_datatypes") # check sequences type for seq_type in (tuple, list, set, frozenset): c.execute("insert into test_datatypes (i, l) values (2,4), (6,8), (10,12)") seq = seq_type([2,6]) c.execute("select l from test_datatypes where i in %s order by i", (seq,)) r = c.fetchall() self.assertEqual(((4,),(8,)), r) c.execute("delete from test_datatypes") finally: c.execute("drop table test_datatypes") def test_dict(self): """ test dict escaping """ conn = self.connections[0] c = conn.cursor() c.execute("create table test_dict (a integer, b integer, c integer)") try: c.execute("insert into test_dict (a,b,c) values (%(a)s, %(b)s, %(c)s)", {"a":1,"b":2,"c":3}) c.execute("select a,b,c from test_dict") self.assertEqual((1,2,3), c.fetchone()) finally: c.execute("drop table test_dict") def test_string(self): conn = self.connections[0] c = conn.cursor() c.execute("create table test_dict (a text)") test_value = "I am a test string" try: c.execute("insert into test_dict (a) values (%s)", test_value) c.execute("select a from test_dict") self.assertEqual((test_value,), c.fetchone()) finally: c.execute("drop table test_dict") def test_integer(self): conn = self.connections[0] c = conn.cursor() c.execute("create table test_dict (a integer)") test_value = 12345 try: c.execute("insert into test_dict (a) values (%s)", test_value) c.execute("select a from test_dict") self.assertEqual((test_value,), c.fetchone()) finally: c.execute("drop table test_dict") def test_blob(self): """test binary data""" data = bytes(bytearray(range(256)) * 4) conn = self.connections[0] self.safe_create_table( conn, "test_blob", "create table test_blob (b blob)") with conn.cursor() as c: c.execute("insert into test_blob (b) values (%s)", (data,)) c.execute("select b from test_blob") self.assertEqual(data, c.fetchone()[0]) def test_untyped(self): """ test conversion of null, empty string """ conn = self.connections[0] c = conn.cursor() c.execute("select null,''") self.assertEqual((None,u''), c.fetchone()) c.execute("select '',null") self.assertEqual((u'',None), c.fetchone()) def test_timedelta(self): """ test timedelta conversion """ conn = self.connections[0] c = conn.cursor() c.execute("select time('12:30'), time('23:12:59'), time('23:12:59.05100'), time('-12:30'), time('-23:12:59'), time('-23:12:59.05100'), time('-00:30')") self.assertEqual((datetime.timedelta(0, 45000), datetime.timedelta(0, 83579), datetime.timedelta(0, 83579, 51000), -datetime.timedelta(0, 45000), -datetime.timedelta(0, 83579), -datetime.timedelta(0, 83579, 51000), -datetime.timedelta(0, 1800)), c.fetchone()) def test_datetime_microseconds(self): """ test datetime conversion w microseconds""" conn = self.connections[0] if not self.mysql_server_is(conn, (5, 6, 4)): raise SkipTest("target backend does not support microseconds") c = conn.cursor() dt = datetime.datetime(2013, 11, 12, 9, 9, 9, 123450) c.execute("create table test_datetime (id int, ts datetime(6))") try: c.execute( "insert into test_datetime values (%s, %s)", (1, dt) ) c.execute("select ts from test_datetime") self.assertEqual((dt,), c.fetchone()) finally: c.execute("drop table test_datetime") class TestCursor(base.PyMySQLTestCase): # this test case does not work quite right yet, however, # we substitute in None for the erroneous field which is # compatible with the DB-API 2.0 spec and has not broken # any unit tests for anything we've tried. #def test_description(self): # """ test description attribute """ # # result is from MySQLdb module # r = (('Host', 254, 11, 60, 60, 0, 0), # ('User', 254, 16, 16, 16, 0, 0), # ('Password', 254, 41, 41, 41, 0, 0), # ('Select_priv', 254, 1, 1, 1, 0, 0), # ('Insert_priv', 254, 1, 1, 1, 0, 0), # ('Update_priv', 254, 1, 1, 1, 0, 0), # ('Delete_priv', 254, 1, 1, 1, 0, 0), # ('Create_priv', 254, 1, 1, 1, 0, 0), # ('Drop_priv', 254, 1, 1, 1, 0, 0), # ('Reload_priv', 254, 1, 1, 1, 0, 0), # ('Shutdown_priv', 254, 1, 1, 1, 0, 0), # ('Process_priv', 254, 1, 1, 1, 0, 0), # ('File_priv', 254, 1, 1, 1, 0, 0), # ('Grant_priv', 254, 1, 1, 1, 0, 0), # ('References_priv', 254, 1, 1, 1, 0, 0), # ('Index_priv', 254, 1, 1, 1, 0, 0), # ('Alter_priv', 254, 1, 1, 1, 0, 0), # ('Show_db_priv', 254, 1, 1, 1, 0, 0), # ('Super_priv', 254, 1, 1, 1, 0, 0), # ('Create_tmp_table_priv', 254, 1, 1, 1, 0, 0), # ('Lock_tables_priv', 254, 1, 1, 1, 0, 0), # ('Execute_priv', 254, 1, 1, 1, 0, 0), # ('Repl_slave_priv', 254, 1, 1, 1, 0, 0), # ('Repl_client_priv', 254, 1, 1, 1, 0, 0), # ('Create_view_priv', 254, 1, 1, 1, 0, 0), # ('Show_view_priv', 254, 1, 1, 1, 0, 0), # ('Create_routine_priv', 254, 1, 1, 1, 0, 0), # ('Alter_routine_priv', 254, 1, 1, 1, 0, 0), # ('Create_user_priv', 254, 1, 1, 1, 0, 0), # ('Event_priv', 254, 1, 1, 1, 0, 0), # ('Trigger_priv', 254, 1, 1, 1, 0, 0), # ('ssl_type', 254, 0, 9, 9, 0, 0), # ('ssl_cipher', 252, 0, 65535, 65535, 0, 0), # ('x509_issuer', 252, 0, 65535, 65535, 0, 0), # ('x509_subject', 252, 0, 65535, 65535, 0, 0), # ('max_questions', 3, 1, 11, 11, 0, 0), # ('max_updates', 3, 1, 11, 11, 0, 0), # ('max_connections', 3, 1, 11, 11, 0, 0), # ('max_user_connections', 3, 1, 11, 11, 0, 0)) # conn = self.connections[0] # c = conn.cursor() # c.execute("select * from mysql.user") # # self.assertEqual(r, c.description) def test_fetch_no_result(self): """ test a fetchone() with no rows """ conn = self.connections[0] c = conn.cursor() c.execute("create table test_nr (b varchar(32))") try: data = "pymysql" c.execute("insert into test_nr (b) values (%s)", (data,)) self.assertEqual(None, c.fetchone()) finally: c.execute("drop table test_nr") def test_aggregates(self): """ test aggregate functions """ conn = self.connections[0] c = conn.cursor() try: c.execute('create table test_aggregates (i integer)') for i in range(0, 10): c.execute('insert into test_aggregates (i) values (%s)', (i,)) c.execute('select sum(i) from test_aggregates') r, = c.fetchone() self.assertEqual(sum(range(0,10)), r) finally: c.execute('drop table test_aggregates') def test_single_tuple(self): """ test a single tuple """ conn = self.connections[0] c = conn.cursor() self.safe_create_table( conn, 'mystuff', "create table mystuff (id integer primary key)") c.execute("insert into mystuff (id) values (1)") c.execute("insert into mystuff (id) values (2)") c.execute("select id from mystuff where id in %s", ((1,),)) self.assertEqual([(1,)], list(c.fetchall())) c.close() def test_json(self): args = self.databases[0].copy() args["charset"] = "utf8mb4" conn = pymysql.connect(**args) if not self.mysql_server_is(conn, (5, 7, 0)): raise SkipTest("JSON type is not supported on MySQL <= 5.6") self.safe_create_table(conn, "test_json", """\ create table test_json ( id int not null, json JSON not null, primary key (id) );""") cur = conn.cursor() json_str = u'{"hello": "こんにちは"}' cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,)) cur.execute("SELECT `json` from `test_json` WHERE `id`=42") res = cur.fetchone()[0] self.assertEqual(json.loads(res), json.loads(json_str)) cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,)) res = cur.fetchone()[0] self.assertEqual(json.loads(res), json.loads(json_str)) class TestBulkInserts(base.PyMySQLTestCase): cursor_type = pymysql.cursors.DictCursor def setUp(self): super(TestBulkInserts, self).setUp() self.conn = conn = self.connections[0] c = conn.cursor(self.cursor_type) # create a table ane some data to query self.safe_create_table(conn, 'bulkinsert', """\ CREATE TABLE bulkinsert ( id int(11), name char(20), age int, height int, PRIMARY KEY (id) ) """) def _verify_records(self, data): conn = self.connections[0] cursor = conn.cursor() cursor.execute("SELECT id, name, age, height from bulkinsert") result = cursor.fetchall() self.assertEqual(sorted(data), sorted(result)) def test_bulk_insert(self): conn = self.connections[0] cursor = conn.cursor() data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)] cursor.executemany("insert into bulkinsert (id, name, age, height) " "values (%s,%s,%s,%s)", data) self.assertEqual( cursor._last_executed, bytearray( b"insert into bulkinsert (id, name, age, height) values " b"(0,'bob',21,123),(1,'jim',56,45),(2,'fred',100,180)")) cursor.execute('commit') self._verify_records(data) def test_bulk_insert_multiline_statement(self): conn = self.connections[0] cursor = conn.cursor() data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)] cursor.executemany("""insert into bulkinsert (id, name, age, height) values (%s, %s , %s, %s ) """, data) self.assertEqual(cursor._last_executed.strip(), bytearray(b"""insert into bulkinsert (id, name, age, height) values (0, 'bob' , 21, 123 ),(1, 'jim' , 56, 45 ),(2, 'fred' , 100, 180 )""")) cursor.execute('commit') self._verify_records(data) def test_bulk_insert_single_record(self): conn = self.connections[0] cursor = conn.cursor() data = [(0, "bob", 21, 123)] cursor.executemany("insert into bulkinsert (id, name, age, height) " "values (%s,%s,%s,%s)", data) cursor.execute('commit') self._verify_records(data) def test_issue_288(self): """executemany should work with "insert ... on update" """ conn = self.connections[0] cursor = conn.cursor() data = [(0, "bob", 21, 123), (1, "jim", 56, 45), (2, "fred", 100, 180)] cursor.executemany("""insert into bulkinsert (id, name, age, height) values (%s, %s , %s, %s ) on duplicate key update age = values(age) """, data) self.assertEqual(cursor._last_executed.strip(), bytearray(b"""insert into bulkinsert (id, name, age, height) values (0, 'bob' , 21, 123 ),(1, 'jim' , 56, 45 ),(2, 'fred' , 100, 180 ) on duplicate key update age = values(age)""")) cursor.execute('commit') self._verify_records(data) def test_warnings(self): con = self.connections[0] cur = con.cursor() with warnings.catch_warnings(record=True) as ws: warnings.simplefilter("always") cur.execute("drop table if exists no_exists_table") self.assertEqual(len(ws), 1) self.assertEqual(ws[0].category, pymysql.Warning) if u"no_exists_table" not in str(ws[0].message): self.fail("'no_exists_table' not in %s" % (str(ws[0].message),))