Python爬虫
爬虫概念
1.robots协议
也叫robots.txt,是存放在网站根目录下的文本文件,用来告诉搜索引擎该网站哪些内容是不应该被抓取的,哪些是可以抓取的。
如https://www.csdn.net/robots.txt
User-agent: *
Disallow: /scripts
Disallow: /public
Disallow: /css/
Disallow: /images/
Disallow: /content/
Disallow: /ui/
Disallow: /js/
Disallow: /scripts/
Disallow: /article_preview.html*
Disallow: /tag/
Disallow: /*?*
Disallow: /link/
Sitemap: https://www.csdn.net/sitemap-aggpage-index.xml
Sitemap: https://www.csdn.net/article/sitemap.txt
2.常见的反爬虫措施
1.请求头校验
一般网站会对请求头进行校验,比如Host,UA,Content-Type字段等,模拟请求的时候,这些常见的请求头最好是带上。
2.IP访问次数控制
同一个IP地址短时间内大量发起请求,会引起IP限制,解决方法是用代理IP,或者构建自己的代理IP池。
3.接口请求频率限制
有的网站会控制接口访问的频率,比如有些查询接口,控制两三秒访问一次。
4.接口访问次数限制
每天限制某个IP或账号访问接口的次数,达到上限后出现二次验证或者直接封账号/IP.比如登录接口
5.行为认证
请求次数过多会出现人工认证,如图片验证码,滑动认证,点击认证等,可以对接打码平台。
6,自动化环境检测
selenium自动化工具有的网站会检测出来,大部分可以通过下面两种方式跳过检测,下面两种方式无法处理的话,还可以尝试把页面改为移动端页面(手机模式),最后还有一种方法就是代理服务器拦截修改js代码,把检测selenium的js修改掉。
options = webdriver.ChromeOptions()
# 躲避部分网站selenium检测
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option("useAutomationExtension", False)
driver = webdriver.Chrome(executable_path=chromedriver_path, options=options)
# 躲避部分网站selenium检测
script = "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": script})
对于移动端appium的检测,可以尝试替换为uiautomator2实现自动化
7.数据动态加载
有的数据不是通过html页面的接口请求返回的,抓包分析请求,找到正确的数据接口。
8.请求参数加密
网易云音乐的post请求的请求体就是前端经过js加密后计算得到的,需要逆向js代码
9.返回数据加密
需要逆向js代码,分析如何解密。还有一种像大众点评的评论,需要通过定位去找到文本。
10.动态更新cookies
华为手机云服务,每次请求接口都会重新设置cookies,并且请求头参数也需要跟着cookies一起变化
Python爬虫之requests库
一.发送请求
requests提供了http的所有基本请求方式:
import requests
r = requests.post("http://httpbin.org/post")
r = requests.put("http://httpbin.org/put")
r = requests.delete("http://httpbin.org/delete")
r = requests.head("http://httpbin.org/get")
r = requests.options("http://httpbin.org/get")
基本get请求中参数的传递:
# requests允许使用params关键字参数,以字典的形式来提供get请求url中的参数。
payload = {'key1': 'value1', 'key2': 'value2'}
r = requests.get("http://httpbin.org/get", params=payload)
print(r.url) # http://httpbin.org/get?key2=value2&key1=value1
# 字典中的value还可以以列表的形式传入
payload = {'key1': 'value1', 'key2': ['value2', 'value3']}
r = requests.get('http://httpbin.org/get', params=payload)
print(r.url)
http://httpbin.org/get?key1=value1&key2=value2&key2=value3
添加请求头headers
url = 'https://api.github.com/some/endpoint'
headers = {'user-agent': 'my-app/0.0.1'}
r = requests.get(url, headers=headers)
Post请求
payload = {'key1': 'value1', 'key2': 'value2'}
r = requests.post("http://httpbin.org/post", data=payload)
print(r.text)
# 可以为 data 参数传入一个元组列表
# 在表单中多个元素使用同一 key 的时候,这种方式尤其有效:
payload = (('key1', 'value1'), ('key1', 'value2'))
r = requests.post('http://httpbin.org/post', data=payload)
print(r.text)
{
...
"form": {
"key1": [
"value1",
"value2"
]
},
...
}
# post的为json对象
url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
r = requests.post(url, json=payload)
超时设置:
requests.get('http://github.com', timeout=0.001)
二.响应内容
import requests
r = requests.get('https://github.com/timeline.json')
r.encoding='utf-8'
r.text
# [{"repository":{"open_issues":0,"url":"https://github.com/...
网页乱码问题:
# 查看网页编码
print(res.apparent_encoding)
# 设置编码
res.encoding = 'GB2312'
二进制响应内容(r.content)
from PIL import Image
from io import BytesIO
#BytesIO用于操作内存中的二进制数据
img=Image.open(BytesIO(r.content))
JSON响应内容(r.json())
import requests
r = requests.get('https://github.com/timeline.json')
r.json()
# [{u'repository': {u'open_issues': 0, u'url': 'https://github.com/...
响应状态码(r.status_code)
r = requests.get('http://httpbin.org/get')
r.status_code
200
响应头(r.headers)
r.headers
{
'content-encoding': 'gzip',
'transfer-encoding': 'chunked',
'connection': 'close',
'server': 'nginx/1.0.4',
'x-runtime': '148ms',
'etag': '"e1ca502697e5c9317743dc078f67693f"',
'content-type': 'application/json'
}
r.headers['Content-Type']
'application/json'
三.Cookies
如果某个响应中包含一些 cookie,你可以快速访问它们:
url = 'http://example.com/some/cookie/setting/url'
r = requests.get(url)
r.cookies['example_cookie_name']
# 'example_cookie_value'
要想发送你的cookies到服务器,可以使用 cookies 参数:
url = 'http://httpbin.org/cookies'
cookies = dict(cookies_are='working')
r = requests.get(url, cookies=cookies)
r.text
# '{"cookies": {"cookies_are": "working"}}'
四.会话
requests.Session()这样可以在会话中保留状态,保持cookie等
import requests
s = requests.Session()
s.headers.update({'x-test': 'true'})
r = s.get('http://httpbin.org/headers', headers={'x-test2': 'true'})
print(r.text)
五.代理
如果需要使用代理,你可以通过为任意请求方法提供 proxies 参数来配置单个请求
# http代理
import requests
proxies = {
"https": "http://41.118.132.69:4433"
}
r = requests.post("http://httpbin.org/post", proxies=proxies)
# socks代理
proxies = {
'http': 'socks5://user:pass@host:port',
'https': 'socks5://user:pass@host:port'
}
六.Prepared Request
构造requests.Request对象,将Request对象作为参数传入requests.Session()对象的prepare_request()方法中,最后通过Session对象的send()方法发送请求。
import requests
from requests import Request
url = 'http://httpbin.org/get'
# 创建Session对象
s = requests.Session()
# 构造Request对象
req = Request('GET',url)
# 将Request对象转换成 PreparedRequest对象
prepped = s.prepare_request(req)
# 利用Session对象的send()方法,发送PreparedRequest对象
res = s.send(prepped)
print(res.text)
print(type(prepped))
Python爬虫之BeautifulSoup
格式化输出
html_doc = """
<html><head><title>The Dormouse's story</title></head>
<body>
<p class="title"><b>The Dormouse's story</b></p>
<p class="story">Once upon a time there were three little sisters; and their names were
<a href="http://example.com/elsie" class="sister" id="link1">Elsie</a>,
<a href="http://example.com/lacie" class="sister" id="link2">Lacie</a> and
<a href="http://example.com/tillie" class="sister bro" id="link3">Tillie</a>;
and they lived at the bottom of a well.</p>
<p class="story">...</p>
"""
soup = BeautifulSoup(html_doc, 'html.parser')
print(soup.prettify())
find_all(name , attrs , recursive , string , **kwargs)
# 查找所有的a标签
res = soup.find_all('a')
# # 查找所有的a标签和p标签
res = soup.find_all(['a', 'p'])
# 查找class=title的p标签
res = soup.find_all('p', 'title')
# 指定属性查找 可支持字符串,正则表达式,或者函数
# 指定id查找元素
res = soup.find_all(id="link1")
# 指定href查找 [<a class="sister" href="http://example.com/elsie" id="link1">Elsie</a>]
res = soup.find_all(href=re.compile('elsie'))
# 指定多个属性查找
res = soup.find_all(id='link1', href=re.compile('elsie'))
# 指定多个属性查找 attrs参数
res = soup.find_all(attrs={'id': 'link1', 'href': re.compile('elsie')})
# 通过css搜索
res = soup.find_all(class_="sister bro")
# 通过函数过滤,查找类名长度大于6的元素
res = soup.find_all(class_=lambda x: x is not None and len(x) > 6)
# recursive参数,如果只想搜索直接子节点 recursive=False
res = soup.find_all('title', recursive=False)
# find_all() 方法的返回结果是值包含一个元素的列表
# 而find()方法直接返回第一个结果,没有则返回None.
res = soup.find('a')
CSS选择器
# 类查找
res = soup.select('.sister')
# ID查找
res = soup.select('#link1')
res = soup.select('a#link1')
# 通过是否存在某个属性查找
res = soup.select('a[href]')
# 指定属性值查找
res = soup.select('a[href="http://example.com/tillie"]')
# 查找返回第一个元素
res = soup.select_one('a[href]')
# 获取元素的属性值
res = soup.select_one('a[href]').get('href')
# 获取元素的文本
res = soup.select_one('a[href]').text
Selenium自动化
1.基础操作
from selenium import webdriver
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
if __name__ == '__main__':
# 谷歌浏览器驱动
chromedriver_path = 'chromedriver72.exe'
options = webdriver.ChromeOptions()
# 躲避部分网站selenium检测
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option("useAutomationExtension", False)
driver = webdriver.Chrome(executable_path=chromedriver_path, options=options)
# 躲避部分网站selenium检测
script = "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": script})
# 浏览器最大化
driver.maximize_window()
url = 'https://www.python.org/'
driver.get(url)
# 显式等待
wait = WebDriverWait(driver, 20, 1)
# 在主页输入框搜索requests,并点击搜索
input_ = wait.until(EC.presence_of_element_located((By.ID, 'id-search-field')))
input_.send_keys('requests')
time.sleep(1)
btn = driver.find_element_by_xpath('//button[@title="Submit this Search"]')
btn.click()
time.sleep(10)
driver.close()
2.元素定位
查找单个元素
最常用的定位元素的两个方法是通过Xpath和id来定位。
- find_element_by_id
- find_element_by_xpath
查找多个元素
- find_elements_by_xpath
- find_elements_by_name
# 通过xpath查找元素
driver.find_element_by_xpath('//button[@title="Submit this Search"]')
# 通过id查找元素
driver.find_element_by_id('id-search-field')
获取元素内部内容
text = driver.find_element_by_xpath('//*[@id="ISDCaptcha"]/div[2]/div').get_attribute('innerHTML')
if '请绘制图中手势' in text:
print('出现行为认证')
获取元素指定属性的属性值
driver.find_element_by_xpath('//div[@id="find-step3-phone"]').get_attribute('style')
driver.find_element_by_xpath('//*[@id="imgVerifyCodeP"]').get_attribute('src')
切换到指定iframe
# 通过id或者名称
driver.switch_to.frame("iframeLoginIfm")
driver.switch_to.frame(0)
frame = driver.find_element_by_xpath('//div[@id="loginDiv"]/iframe')
driver.switch_to.frame(frame)
切换到指定窗口
driver.switch_to.window(browser.window_handles[1])
切换到alert弹窗
text = driver.switch_to.alert.text
if '图片验证码输入错误' in text:
print('图片验证码识别错误')
driver.switch_to.alert.accept()
3.元素交互
按钮点击
btn = driver.find_element_by_xpath('//div[@role="button"]/div/span/span')
btn.click()
执行js代码
style_ = driver.find_element_by_xpath('//*[@id="passport-login-pop"]').get_attribute('style')
style_ = style_.replace('display: none;', '')
if not style_:
style_ = 'left: 259px; top: 212px; z-index: 60001;'
js = 'document.getElementById("passport-login-pop").setAttribute("style","{}");'.format(style_)
driver.execute_script(js)
表单输入
input_ = driver.find_element_by_xpath('//input[@name="session[password]" and @dir="auto"]')
input_.send_keys('123qwe')
from selenium.webdriver.common.keys import Keys
input_.send_keys(Keys.BACK_SPACE)
页面滚动
driver.execute_script("""
(function () {
var y = document.body.scrollTop;
var step = 100;
window.scroll(0, y);
function f() {
if (y < document.body.scrollHeight) {
y += step;
window.scroll(0, y);
setTimeout(f, 50);
}
else {
window.scroll(0, y);
document.title += "scroll-done";
}
}
setTimeout(f, 1000);
})();
""")
模拟拖动
from selenium.webdriver.common.action_chains import ActionChains
def get_track(self, distance):
track = []
current = 0
mid = distance * 3 / 4
t = 0.2
v = 0
while current < distance:
if current < mid:
a = 2
else:
a = -3
v0 = v
v = v0 + a * t
move = v0 * t + 1 / 2 * a * t * t
current += move
track.append(round(move))
return track
# 模拟拖动
btn = wait.until(EC.presence_of_element_located((By.XPATH, xpath_)))
track = get_track(500)
action = ActionChains(browser)
action.click_and_hold(btn).perform()
action.reset_actions()
for i in track:
action.move_by_offset(xoffset=i, yoffset=0).perform()
action.reset_actions()
4.等待
显式等待
# 显式等待
wait = WebDriverWait(driver, 20, 1)
input_ = wait.until(EC.presence_of_element_located((By.ID, 'id-search-field')))
input_.send_keys('requests')
time.sleep(1)
隐式等待
from selenium import webdriver
driver = webdriver.Chrome()
# 隐式等待
driver.implicitly_wait(10)
driver.get('https://www.zhihu.com/explore')
logo = driver.find_element_by_id('zh-top-link-logo')
print(logo)
driver.close()
5.其他操作
解决页面加载时间过长问题
有时候页面有些静态文件加载比较耗时,selenium可以不需要等待页面全部加载完全在去查找元素
options = webdriver.ChromeOptions()
# 解决页面加载阻塞问题
options.set_capability('pageLoadStrategy', 'none')
driver = webdriver.Chrome(executable_path=self.chromedriver_path, options=options)
添加请求头
options.add_argument("user-agent={}".format('Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36'))
添加代理
socks5 = "socks5://{}:{}".format(socks5_proxy_ip, socks5_proxy_port)
options.add_argument("--proxy-server={}".format(socks5))
捕捉F12控制台中所有请求记录
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
d = DesiredCapabilities.CHROME
d['loggingPrefs'] = {'performance': 'ALL'}
d['goog:chromeOptions'] = {
'perfLoggingPrefs': {
'enableNetwork': True,
},
'w3c': False,
}
options.add_experimental_option('perfLoggingPrefs', {'enableNetwork': True})
options.add_experimental_option('w3c', False)
driver = webdriver.Chrome(executable_path=self.chromedriver_path, options=options, desired_capabilities=d)
# 保存log
log_list = []
for entry in driver.get_log('performance'):
log_list.append(entry)
屏幕截图,可以截取图片验证码加以识别
import win32con
import win32gui
import win32print
from win32api import GetSystemMetrics
from PIL import Image
def get_real_resolution():
"""获取真实的分辨率"""
hDC = win32gui.GetDC(0)
# 横向分辨率
w = win32print.GetDeviceCaps(hDC, win32con.DESKTOPHORZRES)
# 纵向分辨率
h = win32print.GetDeviceCaps(hDC, win32con.DESKTOPVERTRES)
return w, h
def get_screen_size():
"""获取缩放后的分辨率"""
w = GetSystemMetrics(0)
h = GetSystemMetrics(1)
return w, h
real_resolution = get_real_resolution()
screen_size = get_screen_size()
screen_scale_rate = round(real_resolution[0] / screen_size[0], 2)
pic_name = '***.png'
driver.save_screenshot(pic_name)
# 找到图片验证码元素
element = driver.find_element_by_xpath(xpath_)
left = element.location['x'] * screen_scale_rate
top = element.location['y'] * screen_scale_rate
right = (element.location['x'] + element.size['width']) * screen_scale_rate
bottom = (element.location['y'] + element.size['height']) * screen_scale_rate
im = Image.open(pic_name)
# 裁剪图片
im = im.crop((left, top, right, bottom))
im.save(pic_name)
# 把图片转成base64,利用打码平台接口识别
with open(pic_name, 'rb') as f:
code_img_base64 = base64.b64encode(f.read()).decode()
os.remove(pic_name)
xpath解析
- // :从匹配选择的当前节点选择文档中的节点,而不考虑它们的位置。
- / : 匹配当前目录下的直接子节点。
- .. : 匹配当前节点的父节点。
- @:选取属性。
- //* : 选取文档中所有元素
text = """
<?xml version="1.0" encoding="UTF-8"?>
<bookstore>
<book>
<title lang="eng">Harry Potter</title>
<price>29.99</price>
</book>
<book>
<title lang="cn">Learning XML</title>
<price>39.95</price>
<aa lang="cn eng aa bb" name="cc">Learning XML</aa>
</book>
</bookstore>
"""
from lxml import etree
html = etree.HTML(text)
# print(etree.tostring(html).decode('utf-8'))
# 选取所有指定的节点
res = html.xpath('//book')
# 获取指定节点的所有直接子节点
res = html.xpath('//book/aa')
# 获取指定节点的父节点
res = html.xpath("//aa/..")
# 通过属性匹配选择节点
res = html.xpath('//title[@lang="cn"]')
# 获取文本值
res = html.xpath('//title[@lang="cn"]/text()')
res = html.xpath('//price/text()')
# 获取属性值 ['eng', 'cn']
res = html.xpath('//title/@lang')
# 属性多值匹配
res = html.xpath('//aa[contains(@lang,"aa")]')
# 对于属性值有多个的节点,不用contains函数的话,匹配到的是空[]
res = html.xpath('//aa[@lang="aa"]')
# 文本匹配
res = html.xpath('//title[contains(text(), "XML")]')
# 运算符
res = html.xpath('//aa[contains(@lang,"aa") and @name="cc"]')
Python爬虫—代理池维护
大致思路
- 去代理网站上爬取大量代理IP,并将其存储在redis数据库。
- 定时获取redis中的所有代理IP,检测每一个代理IP是否可用。
- 通过flask,对外提供获取代理IP的接口,如果想要使用代理池中的代理IP,只需要访问我们提供的接口即可。
现在网上免费代理IP网站越来越少,而且免费的代理质量非常不稳定,本文只是提供构建代理IP池的一种思路,实战的话还是要想办法获取优质的代理。
代理池系统具体实现思路
代理池完整代码
agent_pool.py 整体流程
存储模块:主要实现的功能是,去一些免费代理网站爬取大量的代理IP,并存储至redis数据库中。redis的Sorted Set结构是一个有序集合,我们会对每一个爬取到的代理IP
设置一个初始化的优先级10,Sorted Set也是通过这个优先级来进行排序的。</br>
- Getter:爬取代理网站的免费代理IP,存入redis
- Tester:从redis中取出代理,测试代理是否可用,并调整代理IP的优先级
- Controller:启动Getter()与Tester()
from Crawler import Crawler
from RedisClient import RedisClient
import traceback
import time
import requests
import multiprocessing
from concurrent import futures
FULL_COUNT = 2000
class Getter(object):
# 爬取代理网站的免费代理IP,存入redis
def __init__(self):
self.redis_client = RedisClient()
self.crawler = Crawler()
def is_full(self):
# 判断代理池是否满了
return self.redis_client.get_proxy_count() >= FULL_COUNT
def run(self):
# 将爬取到的代理存入redis
if not self.is_full():
proxys = self.crawler.get_crawler_proxy()
for proxy in proxys:
self.redis_client.add(proxy)
class Tester(object):
# 从redis中取出代理,测试代理是否可用,并调整代理IP的优先级
def __init__(self, test_url):
self.redisdb = RedisClient()
# 用来测试代理是否可用的地址
self.test_url = test_url
def test_proxy(self, proxy):
try:
if isinstance(proxy, bytes):
proxy = proxy.decode('utf-8')
proxies = {
'http': 'http://' + proxy,
'https': 'https://' + proxy
}
print('正在检测:{}'.format(proxy))
res = requests.get(self.test_url, proxies=proxies, timeout=10)
if res.status_code == 200:
return True, proxy
else:
return False, proxy
# 代理不可用,就降低其优先级
except Exception as e:
return False, proxy
# print('代理检测异常:{} {}'.format(proxy, e))
self.redisdb.decrease(proxy)
print('代理不可用:{}'.format(proxy))
def run(self):
print('启动检测模块......')
try:
# 获取redis中所有爬取到的代理
proxies = self.redisdb.get_all_proxy()
for i in range(0, len(proxies), 50):
test_proxies = proxies[i:i+50]
workers = len(test_proxies)
with futures.ThreadPoolExecutor(workers) as executor:
tasks_res = executor.map(self.test_proxy, test_proxies)
for res, proxy in tasks_res:
if not res:
# 代理不可用,就降低其优先级
self.redisdb.decrease(proxy)
print('代理不可用:{}'.format(proxy))
else:
# 代理可用,将其优先级置为最大
self.redisdb.max(proxy)
print('代理可用:{}'.format(proxy))
except Exception as e:
print(traceback.format_exc())
print('检测模块出错!!!')
class Controller(object):
def control_get(self):
# 获取功能:爬取代理网站,将代理存储到redis
getter = Getter()
while True:
try:
getter.run()
except:
print(traceback.format_exc())
time.sleep(30)
def control_test(self):
# 检测功能,检测redis中的代理是否可用
tester = Tester(test_url='http://www.baidu.com')
while True:
try:
tester.run()
except:
print(traceback.format_exc())
time.sleep(30)
def run(self):
print('代理池开始运行了......')
# 两个进程
get = multiprocessing.Process(target=self.control_get)
get.start()
test = multiprocessing.Process(target=self.control_test)
test.start()
if __name__ == '__main__':
control = Controller()
control.run()
WebAPI_to_get_proxy.py 通过flask向外提供获取代理IP的接口
from flask import Flask, g
import RedisClient
"""
对外提供web接口,通过提供的web接口,来获取redis中的代理
g是上下文对象,处理请求时,用于临时存储的对象,每次请求都会重设这个变量。比如:我们可以获取一些临时请求的用户信息。
"""
app = Flask(__name__)
@app.route('/')
def index():
return '<h2>欢迎来到daacheng代理池系统</h2>'
def get():
if not hasattr(g, 'redis'):
g.redis = RedisClient.RedisClient()
return g.redis
@app.route('/random')
def get_random_proxy():
# 从代理池中返回一个代理
redisdb = get()
return redisdb.get_proxy()
@app.route('/count')
def count():
# 查询代理池中代理的个数
redisdb = get()
return str(redisdb.get_proxy_count())
@app.route('/all')
def get_all():
# 查询代理池中代理的个数
redisdb = get()
return str(redisdb.get_all_proxy())
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
app.debug = True