我们开始吧
日期生成
很多时候我们需要批量生成日期,方法有很多,这里分享两段代码
获取过去 N 天的日期
importdatetime
defget_nday_list(n):
before_n_days=[]
foriinrange(1,n+1)[::-1]:
before_n_days.append(str(datetime.date.today()-datetime.timedelta(days=i)))
returnbefore_n_days
a=get_nday_list(30)
print(a)
Output:
['2021-12-23', '2021-12-24', '2021-12-25', '2021-12-26', '2021-12-27', '2021-12-28', '2021-12-29', '2021-12-30', '2021-12-31', '2022-01-01', '2022-01-02', '2022-01-03', '2022-01-04', '2022-01-05', '2022-01-06', '2022-01-07', '2022-01-08', '2022-01-09', '2022-01-10', '2022-01-11', '2022-01-12', '2022-01-13', '2022-01-14', '2022-01-15', '2022-01-16', '2022-01-17', '2022-01-18', '2022-01-19', '2022-01-20', '2022-01-21']
生成一段时间内的日期
importdatetime
defcreate_assist_date(datestart=None,dateend=None):
创建日期辅助表
ifdatestartisNone:
datestart='2016-01-01'
ifdateendisNone:
dateend=datetime.datetime.now().strftime('%Y-%m-%d')
转为日期格式
datestart=datetime.datetime.strptime(datestart,'%Y-%m-%d')
dateend=datetime.datetime.strptime(dateend,'%Y-%m-%d')
date_list=[]
date_list.append(datestart.strftime('%Y-%m-%d'))
whiledatestart
Output:
['2021-12-27', '2021-12-28', '2021-12-29', '2021-12-30']
保存数据到CSV
保存数据到 CSV 是太常见的操作了,分享一段我个人比较喜欢的写法
defsave_data(data,date):
ifnotos.path.exists(r'2021data%s.csv'%date):
withopen("2021data%s.csv"%date,"a+",encoding='utf-8')asf:
f.write("标题,热度,时间,url\n")
foriindata:
title=i["title"]
extra=i["extra"]
time=i['time']
url=i["url"]
row='{},{},{},{}'.format(title,extra,time,url)
f.write(row)
f.write('\n')
else:
withopen("2021data%s.csv"%date,"a+",encoding='utf-8')asf:
foriindata:
title=i["title"]
extra=i["extra"]
time=i['time']
url=i["url"]
row='{},{},{},{}'.format(title,extra,time,url)
f.write(row)
f.write('\n')
带背景颜色的 Pyecharts
Pyecharts 作为 Echarts 的优秀 Python 实现,受到众多开发者的青睐,用 Pyecharts 作图时,使用一个舒服的背景也会给我们的图表增色不少
以饼图为例,通过添加 JavaScript 代码来改变背景颜色
defpie_rosetype(data)->Pie:
background_color_js=(
"newecharts.graphic.LinearGradient(0,0,0,1,"
"[{offset:0,color:'#c86589'},{offset:1,color:'#06a7ff'}],false)"
)
c=(
Pie(init_opts=opts.InitOpts(bg_color=JsCode(background_color_js)))
.add(
"",
data,
radius=["30%","75%"],
center=["45%","50%"],
rosetype="radius",
label_opts=opts.LabelOpts(formatter="{b}:{c}"),
)
.set_global_opts(title_opts=opts.TitleOpts(title=""),
)
)
returnc
requests 库调用
据统计,requests 库是 Python 家族里被引用得最多的第三方库,足见其江湖地位之高大!
发送 GET 请求
importrequests
headers={
'user-agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/96.0.4664.110Safari/537.36',
'cookie':'some_cookie'
}
response=requests.request("GET",url,headers=headers)
发送 POST 请求
importrequests
payload={}
files=[]
headers={
'user-agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/96.0.4664.110Safari/537.36',
'cookie':'some_cookie'
}
response=requests.request("POST",url,headers=headers,data=payload,files=files)
根据某些条件循环请求,比如根据生成的日期
defget_data(mydate):
date_list=create_assist_date(mydate)
url="https://test.test"
files=[]
headers={
'user-agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/96.0.4664.110Safari/537.36',
'cookie':''
}
fordindate_list:
payload={'p':'10',
'day':d,
'nodeid':'1',
't':'itemsbydate',
'c':'node'}
foriinrange(1,100):
payload['p']=str(i)
print("getdataof%sinpage%s"%(d,str(i)))
response=requests.request("POST",url,headers=headers,data=payload,files=files)
items=response.json()['data']['items']
ifitems:
save_data(items,d)
else:
break
Python 操作各种数据库
操作 Redis
连接 Redis
imortredis
defredis_conn_pool():
pool=redis.ConnectionPool(host='localhost',port=6379,decode_responses=True)
rd=redis.Redis(connection_pool=pool)
returnrd
写入 Redis
fromredis_connimportredis_conn_pool
rd=redis_conn_pool()
rd.set('test_data','mytest')
操作 MongoDB
连接 MongoDB
frompymongoimportMongoClient
conn=MongoClient("mongodb://%s:%s@ipaddress:49974/mydb"%('username','password'))
db=conn.mydb
mongo_collection=db.mydata
批量插入数据
res=requests.get(url,params=query).json()
commentList=res['data']['commentList']
mongo_collection.insert_many(commentList)
操作 MySQL
连接 MySQL
importMySQLdb
打开数据库连接
db=MySQLdb.connect("localhost","testuser","test123","TESTDB",charset='utf8')
使用cursor()方法获取操作游标
cursor=db.cursor()
执行 SQL 语句
使用execute方法执行SQL语句
cursor.execute("SELECTVERSION()")
使用fetchone()方法获取一条数据
data=cursor.fetchone()
print"Databaseversion:%s"%data
关闭数据库连接
db.close()
Output:
Database version : 5.0.45
本地文件整理
整理文件涉及需求的比较多,这里分享的是将本地多个 CSV 文件整合成一个文件
importpandasaspd
importos
dflist=[]
foriinos.listdir():
if"csv"ini:
day=i.split('.')[0].split('')[-1]
df=pd.read_csv(i)
df['day']=day
df_list.append(df)
df=pd.concat(df_list,axis=0)
df.to_csv("total.txt",index=0)
多线程代码
多线程也有很多实现方式,我们选择自己最为熟悉顺手的方式即可
importthreading
importtime
exitFlag=0
classmyThread(threading.Thread):
definit(self,threadID,name,delay):
threading.Thread.init(self)
self.threadID=threadID
self.name=name
self.delay=delay
defrun(self):
print("开始线程:"+self.name)
print_time(self.name,self.delay,5)
print("退出线程:"+self.name)
defprint_time(threadName,delay,counter):
whilecounter:
ifexitFlag:
threadName.exit()
time.sleep(delay)
print("%s:%s"%(threadName,time.ctime(time.time())))
counter-=1
创建新线程
thread1=myThread(1,"Thread-1",1)
thread2=myThread(2,"Thread-2",2)
开启新线程
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("退出主线程")
异步编程代码
异步爬取网站
importasyncio
importaiohttp
importaiofiles
asyncdefget_html(session,url):
try:
asyncwithsession.get(url=url,timeout=8)asresp:
ifnotresp.status//100==2:
print(resp.status)
print("爬取",url,"出现错误")
else:
resp.encoding='utf-8'
text=awaitresp.text()
returntext
exceptExceptionase:
print("出现错误",e)
awaitget_html(session,url)
使用异步请求之后,对应的文件保存也需要使用异步,即是一处异步,处处异步
asyncdefdownload(title_list,content_list):
asyncwithaiofiles.open('{}.txt'.format(title_list[0]),'a',
encoding='utf-8')asf:
awaitf.write('{}'.format(str(content_list)))