Halo
发布于 2022-05-06 / 114 阅读 / 0 评论 / 0 点赞

python操作mysql

pysql 默认非自动提交

  • pymysql在连接数据库的,参数autocommit默认为False
connection=pymysql.connect(host=host,user=username,password=password,database=database,charset=charset,port=port,autocommit=autocommit)
  • pymysql的连接或游标都不是线程安全的

连接池连接

  • 连接池定义
from gevent import monkey
monkey.patch_socket()
import logging

import gevent
from gevent.queue import Queue
import pymysql as db

logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger("connection_pool")


class ConnectionPool:
    def __init__(self, db_config, time_to_sleep=30, test_run=False):
        self.username = db_config.get('user')
        self.password = db_config.get('password')
        self.host = db_config.get('host')
        self.port = int(db_config.get('port'))
        self.max_pool_size = 20
        self.test_run = test_run
        self.pool = None
        self.time_to_sleep = time_to_sleep
        self._initialize_pool()

    def get_initialized_connection_pool(self):
        return self.pool

    def _initialize_pool(self):
        self.pool = Queue(maxsize=self.max_pool_size)
        current_pool_size = self.pool.qsize()
        if current_pool_size < self.max_pool_size:  # this is a redundant check, can be removed
            for _ in xrange(0, self.max_pool_size - current_pool_size):
                try:
                    conn = db.connect(host=self.host,
                                      user=self.username,
                                      passwd=self.password,
                                      port=self.port)
                    self.pool.put_nowait(conn)

                except db.OperationalError, e:
                    LOGGER.error("Cannot initialize connection pool - retrying in {} seconds".format(self.time_to_sleep))
                    LOGGER.exception(e)
                    break
        self._check_for_connection_loss()

    def _re_initialize_pool(self):
        gevent.sleep(self.time_to_sleep)
        self._initialize_pool()

    def _check_for_connection_loss(self):
        while True:
            conn = None
            if self.pool.qsize() > 0:
                conn = self.pool.get()

            if not self._ping(conn):
                if self.test_run:
                    self.port = 3306

                self._re_initialize_pool()

            else:
                self.pool.put_nowait(conn)

            if self.test_run:
                break
            gevent.sleep(self.time_to_sleep)

    def _ping(self, conn):
        try:
            if conn is None:
                conn = db.connect(host=self.host,
                                  user=self.username,
                                  passwd=self.password,
                                  port=self.port)
            cursor = conn.cursor()
            cursor.execute('select 1;')
            LOGGER.debug(cursor.fetchall())
            return True

        except db.OperationalError, e:
            LOGGER.warn('Cannot connect to mysql - retrying in {} seconds'.format(self.time_to_sleep))
            LOGGER.exception(e)
            return False
  • 连接池使用
# test (pytest compatible) -------------------------------------------------------------------------------------------
import logging

from src.py.ConnectionPool import ConnectionPool

logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger("test_connection_pool")


def test_get_initialized_connection_pool():
    config = {
        'user': 'root',
        'password': '',
        'host': '127.0.0.1',
        'port': 3305
    }
    conn_pool = ConnectionPool(config, time_to_sleep=5, test_run=True)
    pool = conn_pool.get_initialized_connection_pool()
    # when in test run the port will be switched back to 3306
    # so the queue size should be 20 - will be nice to work 
    # around this rather than test_run hack
    assert pool.qsize() == 20

ping 自动重连

from pymysql import connect
from pymysql.cursors import DictCursor

# create the connection
connection = connect(host='host', port='port', user='user', 
                     password='password', db='db', 
                     cursorclass=DictCursor)

# get the cursor
cursor = connection.cursor()

# if the connection was lost, then it reconnects
connection.ping(reconnect=True)      

# execute the query
cursor.execute(query)

操作

  1. 创建数据库
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = connection.cursor()
 
# 使用 execute() 方法执行 SQL,如果表存在则删除
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")
 
# 使用预处理语句创建表
sql = """CREATE TABLE EMPLOYEE (
         FIRST_NAME  CHAR(20) NOT NULL,
         LAST_NAME  CHAR(20),
         AGE INT,  
         SEX CHAR(1),
         INCOME FLOAT )"""
 
cursor.execute(sql)
  1. 插入数据
# 使用cursor()方法获取操作游标 
cursor = connection.cursor()
 
# SQL 插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
         LAST_NAME, AGE, SEX, INCOME)
         VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
   # 执行sql语句
   cursor.execute(sql)
   # 提交到数据库执行
   connection.commit()
except:
   # 如果发生错误则回滚
   connection.rollback()
 
# 关闭数据库连接
connection.close()
  1. 查询数据库
# 使用cursor()方法获取操作游标 
cursor = connection.cursor()
 
# SQL 查询语句
sql = "SELECT * FROM EMPLOYEE \
       WHERE INCOME > %s" % (1000)
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 获取所有记录列表
   results = cursor.fetchall()
   for row in results:
      fname = row[0]
      lname = row[1]
      age = row[2]
      sex = row[3]
      income = row[4]
       # 打印结果
      print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
             (fname, lname, age, sex, income ))
except:
   print ("Error: unable to fetch data")
 
# 关闭数据库连接
connection.close()
  1. 更新数据
# 使用cursor()方法获取操作游标 
cursor = connection.cursor()
 
# SQL 更新语句
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 提交到数据库执行
   db.commit()
except:
   # 发生错误时回滚
   connection.rollback()
 
# 关闭数据库连接
connection.close()
  1. 删除数据
# 使用cursor()方法获取操作游标 
cursor = connection.cursor()
 
# SQL 删除语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 提交修改
   connection.commit()
except:
   # 发生错误时回滚
   connection.rollback()
 
# 关闭连接
connection.close()
  1. 执行事务
# SQL删除记录语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 向数据库提交
   connection.commit()
except:
   # 发生错误时回滚
   connection.rollback()

评论