包括关系数据库:sqlite,mysql,mssql
非关系数据库:MongoDB,Redis
1. 连接Sqlite
import sqlite3 import traceback try: # 如果表不存在,就创建 with sqlite3.connect('test.db') as conn: print("Opened database successfully") # 删除表 conn.execute("DROP TABLE IF EXISTS COMPANY") # 创建表 sql = """ CREATE TABLE IF NOT EXISTS COMPANY (ID INTEGER PRIMARY KEY AUTOINCREMENT, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL); """ conn.execute(sql) print("create table successfully") # 添加数据 conn.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES (?, ?, ?, ? )", [('Paul', 32, 'California', 20000.00), ('Allen', 25, 'Texas', 15000.00), ('Teddy', 23, 'Norway', 20000.00), ('Mark', 25, 'Rich-Mond ', 65000.00), ('David', 27, 'Texas', 85000.00), ('Kim', 22, 'South-Hall', 45000.00), ('James', 24, 'Houston', 10000.00)]) # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) # VALUES ( 'Paul', 32, 'California', 20000.00 )") # # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) # VALUES ('Allen', 25, 'Texas', 15000.00 )") # # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) # VALUES ('Teddy', 23, 'Norway', 20000.00 )") # # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) # VALUES ( 'Mark', 25, 'Rich-Mond ', 65000.00 )") # # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) # VALUES ( 'David', 27, 'Texas', 85000.00 )"); # # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) # VALUES ( 'Kim', 22, 'South-Hall', 45000.00 )") # # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) # VALUES ( 'James', 24, 'Houston', 10000.00 )") # 提交,否则重新运行程序时,表中无数据 conn.commit() print("insert successfully") # 查询表 sql = """ select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY """ result = conn.execute(sql) for row in result: print("-" * 50) #输出50个-,作为分界线 print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name", row[1])) print("%-10s %s" % ("age", row[2])) print("%-10s %s" % ("address", row[3])) print("%-10s %.2f" % ("salary", row[4])) # or # print('{:10s} {:.2f}'.format("salary", row[4])) except sqlite3.Error as e: print("sqlite3 Error:", e) traceback.print_exc()
2.连接mysql
相关推荐:《python视频教程》
2.2 使用MySQLdb
2.1使用mysqldb库中的_mysql
import MySQLdb from contextlib import closing import traceback try: # 获取一个数据库连接 with closing(MySQLdb.connect(host='localhost', user='root', passwd='root', db='test', port=3306,charset='utf8')) as conn: print("connect database successfully") with closing(conn.cursor()) as cur: # 删除表 cur.execute("DROP TABLE IF EXISTS COMPANY") # 创建表 sql = """ CREATE TABLE IF NOT EXISTS COMPANY (ID INTEGER PRIMARY KEY NOT NULL auto_increment, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL); """ cur.execute(sql) print("create table successfully") # 添加数据 # 在一个conn.execute里面里面执行多个sql语句是非法的 cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )", [('Paul', 32, 'California', 20000.00), ('Allen', 25, 'Texas', 15000.00), ('Teddy', 23, 'Norway', 20000.00), ('Mark', 25, 'Rich-Mond ', 65000.00), ('David', 27, 'Texas', 85000.00), ('Kim', 22, 'South-Hall', 45000.00), ('James', 24, 'Houston', 10000.00)]) # 提交,否则重新运行程序时,表中无数据 conn.commit() print("insert successfully") # 查询表 sql = """ select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY """ cur.execute(sql) for row in cur.fetchall(): print("-" * 50) #输出50个-,作为分界线 print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name", row[1])) print("%-10s %s" % ("age", row[2])) print("%-10s %s" % ("address", row[3])) print("%-10s %s" % ("salary", row[4])) except MySQLdb.Error as e: print("Mysql Error:", e) traceback.print_exc() # 打印错误栈信息
2.2 使用MySQLdb
import MySQLdb from contextlib import closing import traceback try: # 获取一个数据库连接 with closing(MySQLdb.connect(host='localhost', user='root', passwd='root', db='test', port=3306,charset='utf8')) as conn: print("connect database successfully") with closing(conn.cursor()) as cur: # 删除表 cur.execute("DROP TABLE IF EXISTS COMPANY") # 创建表 sql = """ CREATE TABLE IF NOT EXISTS COMPANY (ID INTEGER PRIMARY KEY NOT NULL auto_increment, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL); """ cur.execute(sql) print("create table successfully") # 添加数据 # 在一个conn.execute里面里面执行多个sql语句是非法的 cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )", [('Paul', 32, 'California', 20000.00), ('Allen', 25, 'Texas', 15000.00), ('Teddy', 23, 'Norway', 20000.00), ('Mark', 25, 'Rich-Mond ', 65000.00), ('David', 27, 'Texas', 85000.00), ('Kim', 22, 'South-Hall', 45000.00), ('James', 24, 'Houston', 10000.00)]) # 提交,否则重新运行程序时,表中无数据 conn.commit() print("insert successfully") # 查询表 sql = """ select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY """ cur.execute(sql) for row in cur.fetchall(): print("-" * 50) #输出50个-,作为分界线 print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name", row[1])) print("%-10s %s" % ("age", row[2])) print("%-10s %s" % ("address", row[3])) print("%-10s %s" % ("salary", row[4])) except MySQLdb.Error as e: print("Mysql Error:", e) traceback.print_exc() # 打印错误栈信息
2.3使用pymysql
2.1和2.2节使用MySQLdb,不支持Python3.x
pymysql对Python2.x和Python3.x的支持都比较好
import pymysql from contextlib import closing import traceback try: # 获取一个数据库连接,with关键字 表示退出时,conn自动关闭 # with 嵌套上一层的with 要使用closing() with closing(pymysql.connect(host='localhost', user='root', passwd='root', db='test', port=3306, charset='utf8')) as conn: print("connect database successfully") # 获取游标,with关键字 表示退出时,cur自动关闭 with conn.cursor() as cur: # 删除表 cur.execute("DROP TABLE IF EXISTS COMPANY") # 创建表 sql = """ CREATE TABLE IF NOT EXISTS COMPANY (ID INTEGER PRIMARY KEY NOT NULL auto_increment, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL); """ cur.execute(sql) print("create table successfully") # 添加数据 # 在一个conn.execute里面里面执行多个sql语句是非法的 cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )", [('Paul', 32, 'California', 20000.00), ('Allen', 25, 'Texas', 15000.00), ('Teddy', 23, 'Norway', 20000.00), ('Mark', 25, 'Rich-Mond ', 65000.00), ('David', 27, 'Texas', 85000.00), ('Kim', 22, 'South-Hall', 45000.00), ('James', 24, 'Houston', 10000.00)]) # 提交,否则重新运行程序时,表中无数据 conn.commit() print("insert successfully") # 查询表 sql = """ select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY """ cur.execute(sql) for row in cur.fetchall(): print("-" * 50) #输出50个-,作为分界线 print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name", row[1])) print("%-10s %s" % ("age", row[2])) print("%-10s %s" % ("address", row[3])) print("%-10s %s" % ("salary", row[4])) except pymysql.Error as e: print("Mysql Error:", e) traceback.print_exc()
3.连接mssql
import pymssql from contextlib import closing try: # 先要保证数据库中有test数据库 # 获取一个数据库连接,with关键字 表示退出时,conn自动关闭 # with 嵌套上一层的with 要使用closing() with closing(pymssql.connect(host='192.168.100.114', user='sa', password='sa12345', database='test', port=1433, charset='utf8')) as conn: print("connect database successfully") # 获取游标,with关键字 表示退出时,cur自动关闭 with conn.cursor() as cur: # 删除表 cur.execute( '''if exists (select 1 from sys.objects where name='COMPANY' and type='U') drop table COMPANY''') # 创建表 sql = """ CREATE TABLE COMPANY (ID INT IDENTITY(1,1) PRIMARY KEY NOT NULL , NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL); """ cur.execute(sql) print("create table successfully") # 添加数据 # 在一个conn.execute里面里面执行多个sql语句是非法的 cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )", [('Paul', 32, 'California', 20000.00), ('Allen', 25, 'Texas', 15000.00), ('Teddy', 23, 'Norway', 20000.00), ('Mark', 25, 'Rich-Mond', 65000.00), ('David', 27, 'Texas', 85000.00), ('Kim', 22, 'South-Hall', 45000.00), ('James', 24, 'Houston', 10000.00)]) # 提交,否则重新运行程序时,表中无数据 conn.commit() print("insert successfully") # 查询表 sql = """ select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY """ cur.execute(sql) for row in cur.fetchall(): print("-" * 50) #输出50个-,作为分界线 print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name", row[1])) print("%-10s %s" % ("age", row[2])) print("%-10s %s" % ("address", row[3])) print("%-10s %s" % ("salary", row[4])) except pymssql.Error as e: print("mssql Error:", e) # traceback.print_exc()
4.连接MongoDB
import pymongo from pymongo.mongo_client import MongoClient import pymongo.errors import traceback try: # 连接到 mongodb 服务 mongoClient = MongoClient('localhost', 27017) # 连接到数据库 mongoDatabase = mongoClient.test print("connect database successfully") # 获取集合 mongoCollection = mongoDatabase.COMPANY # 移除所有数据 mongoCollection.remove() # 添加数据 mongoCollection.insert_many([{"Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"}, {"Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"}, {"Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"}, {"Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"}, {"Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"}, {"Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"}, {"Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"}, ]) #获取集合中的值 for row in mongoCollection.find(): print("-" * 50) #输出50个-,作为分界线 print("%-10s %s" % ("_id", row['_id'])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name", row['Name'])) print("%-10s %s" % ("age", row['Age'])) print("%-10s %s" % ("address", row['Address'])) print("%-10s %s" % ("salary", row['Salary'])) print(' ') # 使id自增 mongoCollection.remove() # 创建计数表 mongoDatabase.counters.save({"_id": "people_id", "sequence_value": 0}) # 创建存储过程 mongoDatabase.system_js.getSequenceValue = '''function getSequenceValue(sequenceName){ var sequenceDocument = db.counters.findAndModify({ query: {_id: sequenceName}, update: {$inc:{sequence_value: 1}}, new:true }); return sequenceDocument.sequence_value; }''' mongoCollection.insert_many( [{"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"}, {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"}, {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"}, {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"}, {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"}, {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"}, {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"}, ]) for row in mongoCollection.find(): print("-" * 50) # 输出50个-,作为分界线 print("%-10s %s" % ("_id", int(row['_id']))) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name", row['Name'])) print("%-10s %s" % ("age", row['Age'])) print("%-10s %s" % ("address", row['Address'])) print("%-10s %s" % ("salary", row['Salary'])) except pymongo.errors.PyMongoError as e: print("mongo Error:", e) traceback.print_exc()
5.连接Redis
5.1使用redis
import redis r = redis.Redis(host='localhost', port=6379, db=0, password="12345") print("connect", r.ping()) # 看信息 info = r.info() # or 查看部分信息 # info = r.info("Server") #输出信息 items = info.items() for i, (key, value) in enumerate(items): print("item %s----%s:%s" % (i, key, value)) # 删除键和对应的值 r.delete("company") # 可以一次性push一条或多条数据 r.rpush("company", {"id": 1, "Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"}, {"id": 2, "Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"}, {"id": 3, "Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"}) r.rpush("company", {"id": 4, "Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"}) r.rpush("company", {"id": 5, "Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"}) r.rpush("company", {"id": 6, "Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"}) r.rpush("company", {"id": 7, "Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"}) # eval用来将dict格式的字符串转换成dict for row in map(lambda x: eval(x), r.lrange("company", 0, r.llen("company"))): print("-" * 50) # 输出50个-,作为分界线 print("%-10s %s" % ("_id", row['id'])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name", row['Name'])) print("%-10s %s" % ("age", row['Age'])) print("%-10s %s" % ("address", row['Address'])) print("%-10s %s" % ("salary", row['Salary'])) # 关闭当前连接 # r.shutdown() #这个是关闭redis服务端
5.2使用pyredis
import pyredis r = pyredis.Client(host='localhost', port=6379, database=0, password="12345") print("connect", r.ping().decode("utf-8")) # 看信息 # info = r.execute("info").decode() # or 查看部分信息 info = r.execute("info", "Server").decode() #输出信息 print(info) # 删除键和对应的值 r.delete("company") # 可以一次性push一条或多条数据 r.rpush("company", '''{"id": 1, "Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"}''', '''{"id": 2, "Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"}''', '''{"id": 3, "Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"}''') r.rpush("company", '''{"id": 4, "Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"}''') r.rpush("company", '''{"id": 5, "Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"}''') r.rpush("company", '''{"id": 6, "Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"}''') r.rpush("company", '''{"id": 7, "Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"}''') # eval用来将dict格式的字符串转换成dict for row in map(lambda x: eval(x), r.lrange("company", 0, r.llen("company"))): print("-" * 50) # 输出50个-,作为分界线 print("%-10s %s" % ("_id", row['id'])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name", row['Name'])) print("%-10s %s" % ("age", row['Age'])) print("%-10s %s" % ("address", row['Address'])) print("%-10s %s" % ("salary", row['Salary'])) # 关闭当前连接 r.close()