python项目——新闻管理系统

2023-04-28,,

DAO(Data Access Object) 数据访问对象是一个面向对象的数据库接口
控制台的输入输出都是再app.py里面完成的

 mysql_db.py

import mysql.connector.pooling

__config = {
"host":"192.168.111.153",
"port":3306,
"user":"quan",
"password":"2004",
"database":"vega"
} try:
pool = mysql.connector.pooling.MySQLConnectionPool(
**__config,
pool_size=10
)
except Exception as e:
print(e)

 news_dao.py

from db.mysql_db import pool

class NewDao:
#查询待审批新闻列表
def search_unreview_list(self,page):
try:
con = pool.get_connection()
cursor = con.cursor()
sql = "SELECT n.id,n.title,n.state,t.type,u.username " \
"FROM t_news AS n JOIN t_type AS t ON n.type_id = t.id " \
"JOIN t_user AS u ON n.editor_id = u.id " \
"WHERE n.state = %s " \
"ORDER BY n.create_time DESC " \
"LIMIT %s,%s"
cursor.execute(sql, ("待审批",(page-1)*10,10))
result = cursor.fetchall()
return result
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close() #查询待审批新闻的总页数
def search_unreview_count_page(self):
try:
con = pool.get_connection()
cursor = con.cursor()
sql = "SELECT CEIL(COUNT(*)/10) FROM t_news WHERE state = %s"
cursor.execute(sql, ["待审批"])
count_page = cursor.fetchone()[0]
return count_page
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close() #审批新闻
def update_unreview_news(self,id):
try:
con = pool.get_connection()
con.start_transaction()
cursor = con.cursor()
sql = "UPDATE t_news SET state = %s WHERE id = %s"
cursor.execute(sql, ("已审批",id))
con.commit()
except Exception as e:
if "con" == dir():
con.rollback()
print(e)
finally:
if "con" in dir():
con.close() #查询新闻列表
def search_list(self,page):
try:
con = pool.get_connection()
cursor = con.cursor()
sql = "SELECT n.id,n.title,n.state,t.type,u.username " \
"FROM t_news AS n JOIN t_type AS t ON n.type_id = t.id " \
"JOIN t_user AS u ON n.editor_id = u.id " \
"ORDER BY n.create_time DESC " \
"LIMIT %s,%s"
cursor.execute(sql, ((page - 1) * 10, 10))
result = cursor.fetchall()
return result
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close() #查询新闻总页数
def search_count_page(self):
try:
con = pool.get_connection()
cursor = con.cursor()
sql = "SELECT CEIL(COUNT(*)/10) FROM t_news "
cursor.execute(sql)
count_page = cursor.fetchone()[0]
return count_page
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close() #删除新闻
def delete_by_id(self,id):
try:
con = pool.get_connection()
con.start_transaction()
cursor = con.cursor()
sql = "DELETE FROM t_news WHERE id = %s"
cursor.execute(sql, [id])
con.commit()
except Exception as e:
if "con" == dir():
con.rollback()
print(e)
finally:
if "con" in dir():
con.close()

role_dao.py

from db.mysql_db import pool

class RoleDao:
# 查询角色列表
def search_list(self):
try:
con = pool.get_connection()
cursor = con.cursor()
sql = "SELECT id,role FROM t_role "
cursor.execute(sql)
result = cursor.fetchall()
return result
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close()

user_dao.py

from db.mysql_db import pool

# con = pool.get_connection()
# cursor = con.cursor()
# sql = "SELECT COUNT(*) FROM t_user WHERE username = %s AND " \
# "AES_DECRYPT(UNHEX(password),'HelloWorld') = %s"
# cursor.execute(sql,("admin","123456"))
# count = cursor.fetchone()[0]
# print(count)
# con = pool.get_connection()
# cursor = con.cursor()
# sql = "SELECT r.role FROM t_user AS u JOIN t_role AS r ON " \
# "u.role_id = r.id WHERE u.username = %s"
# cursor.execute(sql, ["admin"])
# role = cursor.fetchone()[0]
# print(role)
class UserDao:
#验证用户登陆信息
def login(self,username,password):
global con
try:
con = pool.get_connection()
cursor = con.cursor()
sql = "SELECT COUNT(*) FROM t_user WHERE username = %s AND " \
"AES_DECRYPT(UNHEX(password),'HelloWorld') = %s"
cursor.execute(sql,(username,password))
count = cursor.fetchone()[0]
return True if count == 1 else False except Exception as e:
print(e)
finally:
if "con" in dir():#因为这个项目使可以执行一次程序,多次登陆#需要给回连接给连接池,通过调用close()即可
con.close() def serch_user_role(self,username):
#查询用户角色
try:
con = pool.get_connection()
cursor = con.cursor()
sql = "SELECT r.role FROM t_user AS u JOIN t_role AS r ON " \
"u.role_id = r.id WHERE u.username = %s"
cursor.execute(sql, [username])
role = cursor.fetchone()[0]
return role
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close() #插入新用户
def insert_user(self,username,password,email,role_id):
try:
con = pool.get_connection()
con.start_transaction()
cursor = con.cursor()
sql = "INSERT INTO t_user(username,password,email,role_id)" \
"VALUES (%s,HEX(AES_ENCRYPT(%s,'HelloWorld')),%s,%s)"
cursor.execute(sql, (username,password,email,role_id))
con.commit()
except Exception as e:
if "con" == dir():
con.rollback()
print(e)
finally:
if "con" in dir():
con.close() #查询用户总页数
def search_count_page(self):
try:
con = pool.get_connection()
cursor = con.cursor()
sql = "SELECT CEIL(COUNT(*)/10) FROM t_user "
cursor.execute(sql)
count_page = cursor.fetchone()[0]
return count_page
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close() #插叙用户的分页记录
def search_list(self,page):
try:
con = pool.get_connection()
cursor = con.cursor()
sql = "SELECT u.id,u.username,r.role,u.email FROM t_user AS u JOIN t_role AS r" \
" ON u.role_id = r.id " \
"ORDER BY u.id " \
"LIMIT %s,%s"
cursor.execute(sql,((page - 1)*10,10))
result = cursor.fetchall()
return result
except Exception as e:
print(e)
finally:
if "con" in dir():
con.close() #修改用户信息
def update(self,id,username,password,email,role_id):
try:
con = pool.get_connection()
con.start_transaction()
cursor = con.cursor()
sql = "UPDATE t_user SET " \
"username = %s, " \
"password = HEX(AES_ENCRYPT(%s,'HelloWord')), " \
"email = %s, " \
"role_id = %s " \
"WHERE id = %s "
cursor.execute(sql, (username, password, email, role_id,id))
con.commit()
except Exception as e:
if "con" == dir():
con.rollback()
print(e)
finally:
if "con" in dir():
con.close() #删除用户
def delete_by_id(self,id):
try:
con = pool.get_connection()
con.start_transaction()
cursor = con.cursor()
sql = "DELETE FROM t_user WHERE id = %s "
cursor.execute(sql, [id])
con.commit()
except Exception as e:
if "con" == dir():
con.rollback()
print(e)
finally:
if "con" in dir():
con.close()

news_service.py

from db.news_dao import NewDao

class NewsService:
__news_dao = NewDao() #查询待审批新闻列表
def search_unreview_list(self,page):
result = self.__news_dao.search_unreview_list(page)
return result # 查询待审批新闻的总页数
def search_unreview_count_page(self):
count_page = self.__news_dao.search_unreview_count_page()
return count_page # 审批新闻
def update_unreview_news(self, id):
self.__news_dao.update_unreview_news(id) #查询新闻列表
def search_list(self,page):
result = self.__news_dao.search_list(page)
return result # 查询新闻总数
def search_count_page(self):
count_page = self.__news_dao.search_count_page()
return count_page # 删除新闻
def delete_by_id(self, id):
self.__news_dao.delete_by_id(id)

role_service.py

from db.role_dao import RoleDao

class RoleService:
__role_dao = RoleDao() # 查询角色列表
def search_list(self):
result = self.__role_dao.search_list()
return result

user_service.py

from db.user_dao import UserDao

class UserService:
__user_dao = UserDao()
#验证用户登陆
def login(self,username,password):
result = self.__user_dao.login(username,password)
return result
#查询用户角色
def search_user_role(self,username):
role = self.__user_dao.serch_user_role(username)
return role # 插入新用户
def insert_user(self, username, password, email, role_id):
self.__user_dao.insert_user(username, password, email, role_id) #查询用户总页数
def search_count_page(self):
count_page = self.__user_dao.search_count_page()
return count_page #插叙用户的分页记录
def search_list(self,page):
result = self.__user_dao.search_list(page)
return result # 修改用户信息
def update(self, id, username, password, email, role_id):
self.__user_dao.update(id, username, password, email, role_id) # 删除用户
def delete_by_id(self, id):
self.__user_dao.delete_by_id(id)

app.py

from colorama import Fore,Style
from getpass import getpass #获取用输入密码
from service.user_service import UserService
from service.news_service import NewsService
from service.role_service import RoleService
import os #为了清空控制台内容
import sys
import time __user_service = UserService()
__news_service = NewsService()
__role_service = RoleService() while True:
os.system("cls")
print(Fore.LIGHTBLUE_EX,"\n\t+++++++++++++++++++")
print(Fore.LIGHTBLUE_EX, "\n\t欢迎使用新闻管理系统")
print(Fore.LIGHTBLUE_EX, "\n\t+++++++++++++++++++")
print(Fore.LIGHTGREEN_EX,"\n\t1)登陆系统")
print(Fore.LIGHTGREEN_EX, "\n\t2)退出系统")
print(Style.RESET_ALL)
opt = input("\n\t输入你的操作编号:")
if opt == "1":
username = input("\n\t输入你的用户名:")
password = getpass("\n\t输入你的密码:")
result = __user_service.login(username,password)
#登陆成功
if result == True:
#查询角色
role = __user_service.search_user_role(username)
while True:
os.system("cls")
if role == "新闻编辑":
print("test")
elif role == "管理员":
print(Fore.LIGHTGREEN_EX,"\n\t1)新闻管理")
print(Fore.LIGHTGREEN_EX, "\n\t2)用户管理")
print(Fore.LIGHTRED_EX, "\n\tback)退出登陆")
print(Fore.LIGHTRED_EX, "\n\texit)退出系统")
print(Style.RESET_ALL)
opt = input("\n\t输入你的操作编号:")
if opt == "1":
while True:
os.system("cls")
print(Fore.LIGHTGREEN_EX, "\n\t1)审批新闻")
print(Fore.LIGHTGREEN_EX, "\n\t2)删除新闻")
print(Fore.LIGHTRED_EX, "\n\tback)返回")
print(Style.RESET_ALL)
opt = input("\n\t输入你的操作编号:")
if opt == "1":
page = 1
while True:
os.system("cls")
count_page = __news_service.search_unreview_count_page()
result = __news_service.search_unreview_list(page)
for index in range(len(result)):
one = result[index]
print(Fore.LIGHTBLUE_EX,"\n\t{0}\t{1}\t{2}\t{3}".format(index+1,one[1],one[2],one[3]))
print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
print(Fore.LIGHTBLUE_EX,"\n\t{0}/{1}".format(page,count_page))
print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
print(Fore.LIGHTRED_EX, "\n\tback)返回")
print(Fore.LIGHTRED_EX, "\n\tprev)上一页")
print(Fore.LIGHTRED_EX, "\n\tnext)下一页")
print(Style.RESET_ALL)
opt = input("\n\t输入你的操作编号:")
if opt == "back":
break
elif opt == "prev" and page > 1:
page -=1
elif opt == "next" and page < count_page:
page +=1
elif int(opt) >= 1 and int(opt) <= 10:
news_id = result[int(opt) - 1][0]
__news_service.update_unreview_news(news_id)
elif opt == "2":
page = 1
while True:
os.system("cls")
count_page = __news_service.search_count_page()
result = __news_service.search_list(page)
for index in range(len(result)):
one = result[index]
print(Fore.LIGHTBLUE_EX, "\n\t{0}\t{1}\t{2}\t{3}".format(index + 1, one[1], one[2], one[3]))
print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
print(Fore.LIGHTBLUE_EX, "\n\t{0}/{1}".format(page, count_page))
print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
print(Fore.LIGHTRED_EX, "\n\tback)返回")
print(Fore.LIGHTRED_EX, "\n\tprev)上一页")
print(Fore.LIGHTRED_EX, "\n\tnext)下一页")
print(Style.RESET_ALL)
opt = input("\n\t输入你的操作编号:")
if opt == "back":
break
elif opt == "prev" and page > 1:
page -= 1
elif opt == "next" and page < count_page:
page += 1
elif int(opt) >= 1 and int(opt) <= 10:
news_id = result[int(opt) - 1][0]
__news_service.delete_by_id(news_id)
elif opt == "back": break
elif opt == "2":
while True:
os.system("cls")
print(Fore.LIGHTGREEN_EX, "\n\t1)添加用户")
print(Fore.LIGHTGREEN_EX, "\n\t2)修改用户")
print(Fore.LIGHTGREEN_EX, "\n\t3)删除用户")
print(Fore.LIGHTRED_EX, "\n\tback)返回")
print(Style.RESET_ALL)
opt = input("\n\t输入你的操作编号:")
if opt == "back":
break
elif opt == "1":
os.system("cls")
username = input("\n\t你要添加的用户名字")
password = getpass("\n\t输入新用户密码")
repassword = getpass("\n\t再次输入新用户密码")
if password != repassword:
print(Fore.LIGHTRED_EX,"\n\t两次密码不一致(3秒自动返回)")
time.sleep(3)
continue
email = input("\n\t新用户邮箱:")
result = __role_service.search_list()
for index in range(len(result)):
one = result[index]
print(Fore.LIGHTBLUE_EX,"\n\t{0}.{1}".format(index+1,one[1]))
print(Style.RESET_ALL)
opt = input("\n\t新用户的角色编号:")
role_id = result[int(opt)-1][0]
__user_service.insert_user(username,password,email,role_id)
print("\n\t保存成功(3秒自动返回)")
time.sleep(3) elif opt == "2": page = 1
while True:
os.system("cls")
count_page = __user_service.search_count_page()
result = __user_service.search_list(page)
for index in range(len(result)):
one = result[index]
print(Fore.LIGHTBLUE_EX,
"\n\t{0}\t{1}\t{2}".format(index + 1, one[1], one[2]))
print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
print(Fore.LIGHTBLUE_EX, "\n\t{0}/{1}".format(page, count_page))
print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
print(Fore.LIGHTRED_EX, "\n\tback)返回")
print(Fore.LIGHTRED_EX, "\n\tprev)上一页")
print(Fore.LIGHTRED_EX, "\n\tnext)下一页")
print(Style.RESET_ALL)
opt = input("\n\t输入你的操作编号:")
if opt == "back":
break
elif opt == "prev" and page > 1:
page -= 1
elif opt == "next" and page < count_page:
page += 1
elif int(opt) >= 1 and int(opt) <= 10:
os.system("cls")
user_id = result[int(opt)-1][0]
username = input("\n\t你要修改成的用户名字")
password = getpass("\n\t输入新密码")
repassword = getpass("\n\t再次输入新密码")
if password != repassword:
print(Fore.LIGHTRED_EX,"\n\t两次密码不一致(3秒自动返回)")
time.sleep(3)
print(Style.RESET_ALL)
break
email = input("\n\t新用户邮箱:")
result = __role_service.search_list()
for index in range(len(result)):
one = result[index]
print(Fore.LIGHTBLUE_EX, "\n\t{0}.{1}".format(index + 1, one[1]))
print(Style.RESET_ALL)
opt = input("\n\t新用户的角色编号:")
role_id = result[int(opt) - 1][0]
opt = input("\n\t是否保存(Y/N)")
if opt.upper() == "Y":
__user_service.update(user_id,username,password,email,role_id)
print("\n\t保存成功(3秒自动返回)")
time.sleep(3)
elif opt == "3": page = 1
while True:
os.system("cls")
count_page = __user_service.search_count_page()
result = __user_service.search_list(page)
for index in range(len(result)):
one = result[index]
print(Fore.LIGHTBLUE_EX,
"\n\t{0}\t{1}\t{2}".format(index + 1, one[1], one[2]))
print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
print(Fore.LIGHTBLUE_EX, "\n\t{0}/{1}".format(page, count_page))
print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
print(Fore.LIGHTRED_EX, "\n\tback)返回")
print(Fore.LIGHTRED_EX, "\n\tprev)上一页")
print(Fore.LIGHTRED_EX, "\n\tnext)下一页")
print(Style.RESET_ALL)
opt = input("\n\t输入你的操作编号:")
if opt == "back":
break
elif opt == "prev" and page > 1:
page -= 1
elif opt == "next" and page < count_page:
page += 1
elif int(opt) >= 1 and int(opt) <= 10:
os.system("cls")
user_id = result[int(opt) - 1][0]
__user_service.delete_by_id(user_id)
print("\n\t删除成功(3秒自动返回)")
time.sleep(3) elif opt == "back":
break
elif opt == "exit":
sys.exit(0)
else:
print("\n\t登陆失败,3秒自动返回")
time.sleep(3)
elif opt == "2":
sys.exit(0)#0代表安全退出,等数据释放等

python项目——新闻管理系统的相关教程结束。

《python项目——新闻管理系统.doc》

下载本文的Word格式文档,以方便收藏与打印。