Python爬虫

爬虫概念

1.robots协议

也叫robots.txt,是存放在网站根目录下的文本文件,用来告诉搜索引擎该网站哪些内容是不应该被抓取的,哪些是可以抓取的。

https://www.csdn.net/robots.txt

User-agent: *
Disallow: /scripts
Disallow: /public
Disallow: /css/
Disallow: /images/
Disallow: /content/
Disallow: /ui/
Disallow: /js/
Disallow: /scripts/
Disallow: /article_preview.html*
Disallow: /tag/
Disallow: /*?*
Disallow: /link/

Sitemap: https://www.csdn.net/sitemap-aggpage-index.xml
Sitemap: https://www.csdn.net/article/sitemap.txt

2.常见的反爬虫措施

1.请求头校验

一般网站会对请求头进行校验,比如Host,UA,Content-Type字段等,模拟请求的时候,这些常见的请求头最好是带上。

2.IP访问次数控制

同一个IP地址短时间内大量发起请求,会引起IP限制,解决方法是用代理IP,或者构建自己的代理IP池。

3.接口请求频率限制

有的网站会控制接口访问的频率,比如有些查询接口,控制两三秒访问一次。

4.接口访问次数限制

每天限制某个IP或账号访问接口的次数,达到上限后出现二次验证或者直接封账号/IP.比如登录接口

5.行为认证

请求次数过多会出现人工认证,如图片验证码,滑动认证,点击认证等,可以对接打码平台。

6,自动化环境检测

selenium自动化工具有的网站会检测出来,大部分可以通过下面两种方式跳过检测,下面两种方式无法处理的话,还可以尝试把页面改为移动端页面(手机模式),最后还有一种方法就是代理服务器拦截修改js代码,把检测selenium的js修改掉。

options = webdriver.ChromeOptions()
# 躲避部分网站selenium检测
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option("useAutomationExtension", False)

driver = webdriver.Chrome(executable_path=chromedriver_path, options=options)

# 躲避部分网站selenium检测
script = "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": script})

对于移动端appium的检测,可以尝试替换为uiautomator2实现自动化

7.数据动态加载

有的数据不是通过html页面的接口请求返回的,抓包分析请求,找到正确的数据接口。

8.请求参数加密

网易云音乐的post请求的请求体就是前端经过js加密后计算得到的,需要逆向js代码

9.返回数据加密

需要逆向js代码,分析如何解密。还有一种像大众点评的评论,需要通过定位去找到文本。

10.动态更新cookies

华为手机云服务,每次请求接口都会重新设置cookies,并且请求头参数也需要跟着cookies一起变化

Python爬虫之requests库

一.发送请求

requests提供了http的所有基本请求方式:

import requests
r = requests.post("http://httpbin.org/post")
r = requests.put("http://httpbin.org/put")
r = requests.delete("http://httpbin.org/delete")
r = requests.head("http://httpbin.org/get")
r = requests.options("http://httpbin.org/get")

基本get请求中参数的传递:

# requests允许使用params关键字参数,以字典的形式来提供get请求url中的参数。
payload = {'key1': 'value1', 'key2': 'value2'}
r = requests.get("http://httpbin.org/get", params=payload)
print(r.url)  # http://httpbin.org/get?key2=value2&key1=value1

# 字典中的value还可以以列表的形式传入
payload = {'key1': 'value1', 'key2': ['value2', 'value3']}

r = requests.get('http://httpbin.org/get', params=payload)
print(r.url)
http://httpbin.org/get?key1=value1&key2=value2&key2=value3

添加请求头headers

url = 'https://api.github.com/some/endpoint'
headers = {'user-agent': 'my-app/0.0.1'}
r = requests.get(url, headers=headers)

Post请求

payload = {'key1': 'value1', 'key2': 'value2'}
r = requests.post("http://httpbin.org/post", data=payload)
print(r.text)
# 可以为 data 参数传入一个元组列表
# 在表单中多个元素使用同一 key 的时候,这种方式尤其有效:
payload = (('key1', 'value1'), ('key1', 'value2'))
r = requests.post('http://httpbin.org/post', data=payload)
print(r.text)
{
  ...
  "form": {
    "key1": [
      "value1",
      "value2"
    ]
  },
  ...
}
# post的为json对象
url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
r = requests.post(url, json=payload)

超时设置:

requests.get('http://github.com', timeout=0.001)

二.响应内容

import requests
r = requests.get('https://github.com/timeline.json')
r.encoding='utf-8'
r.text
# [{"repository":{"open_issues":0,"url":"https://github.com/...

网页乱码问题:

# 查看网页编码
print(res.apparent_encoding)
# 设置编码
res.encoding = 'GB2312'

二进制响应内容(r.content)

from PIL import Image
from io import BytesIO
#BytesIO用于操作内存中的二进制数据
img=Image.open(BytesIO(r.content))

JSON响应内容(r.json())

import requests
r = requests.get('https://github.com/timeline.json')
r.json()
# [{u'repository': {u'open_issues': 0, u'url': 'https://github.com/...

响应状态码(r.status_code)

r = requests.get('http://httpbin.org/get')
r.status_code
200

响应头(r.headers)

r.headers
{
    'content-encoding': 'gzip',
    'transfer-encoding': 'chunked',
    'connection': 'close',
    'server': 'nginx/1.0.4',
    'x-runtime': '148ms',
    'etag': '"e1ca502697e5c9317743dc078f67693f"',
    'content-type': 'application/json'
}
r.headers['Content-Type']
'application/json'

三.Cookies

如果某个响应中包含一些 cookie,你可以快速访问它们:

url = 'http://example.com/some/cookie/setting/url'
r = requests.get(url)

r.cookies['example_cookie_name']
# 'example_cookie_value'

要想发送你的cookies到服务器,可以使用 cookies 参数:

url = 'http://httpbin.org/cookies'
cookies = dict(cookies_are='working')

r = requests.get(url, cookies=cookies)
r.text
# '{"cookies": {"cookies_are": "working"}}'

四.会话

requests.Session()这样可以在会话中保留状态,保持cookie等

import requests
s = requests.Session()
s.headers.update({'x-test': 'true'})
r = s.get('http://httpbin.org/headers', headers={'x-test2': 'true'})
print(r.text)

五.代理

如果需要使用代理,你可以通过为任意请求方法提供 proxies 参数来配置单个请求

# http代理
import requests
proxies = {
  "https": "http://41.118.132.69:4433"
}
r = requests.post("http://httpbin.org/post", proxies=proxies)
# socks代理
proxies = {
    'http': 'socks5://user:pass@host:port',
    'https': 'socks5://user:pass@host:port'
}

六.Prepared Request

构造requests.Request对象,将Request对象作为参数传入requests.Session()对象的prepare_request()方法中,最后通过Session对象的send()方法发送请求。

import requests
from requests import Request
url = 'http://httpbin.org/get'
# 创建Session对象
s = requests.Session()
# 构造Request对象
req = Request('GET',url)
# 将Request对象转换成 PreparedRequest对象
prepped = s.prepare_request(req)
# 利用Session对象的send()方法,发送PreparedRequest对象
res = s.send(prepped)
print(res.text)
print(type(prepped))

Python爬虫之BeautifulSoup

格式化输出

html_doc = """
<html><head><title>The Dormouse's story</title></head>
<body>
<p class="title"><b>The Dormouse's story</b></p>

<p class="story">Once upon a time there were three little sisters; and their names were
<a href="http://example.com/elsie" class="sister" id="link1">Elsie</a>,
<a href="http://example.com/lacie" class="sister" id="link2">Lacie</a> and
<a href="http://example.com/tillie" class="sister bro" id="link3">Tillie</a>;
and they lived at the bottom of a well.</p>

<p class="story">...</p>
"""
soup = BeautifulSoup(html_doc, 'html.parser')
print(soup.prettify())

find_all(name , attrs , recursive , string , **kwargs)

# 查找所有的a标签
res = soup.find_all('a')
# # 查找所有的a标签和p标签
res = soup.find_all(['a', 'p'])

# 查找class=title的p标签
res = soup.find_all('p', 'title')

# 指定属性查找  可支持字符串,正则表达式,或者函数
# 指定id查找元素
res = soup.find_all(id="link1")
# 指定href查找 [<a class="sister" href="http://example.com/elsie" id="link1">Elsie</a>]
res = soup.find_all(href=re.compile('elsie'))
# 指定多个属性查找
res = soup.find_all(id='link1', href=re.compile('elsie'))
# 指定多个属性查找 attrs参数
res = soup.find_all(attrs={'id': 'link1', 'href': re.compile('elsie')})

# 通过css搜索
res = soup.find_all(class_="sister bro")
# 通过函数过滤,查找类名长度大于6的元素
res = soup.find_all(class_=lambda x: x is not None and len(x) > 6)

# recursive参数,如果只想搜索直接子节点  recursive=False
res = soup.find_all('title', recursive=False)

# find_all() 方法的返回结果是值包含一个元素的列表
# 而find()方法直接返回第一个结果,没有则返回None.
res = soup.find('a')

CSS选择器

# 类查找
res = soup.select('.sister')
# ID查找
res = soup.select('#link1')
res = soup.select('a#link1')
# 通过是否存在某个属性查找
res = soup.select('a[href]')
# 指定属性值查找
res = soup.select('a[href="http://example.com/tillie"]')

# 查找返回第一个元素
res = soup.select_one('a[href]')

# 获取元素的属性值
res = soup.select_one('a[href]').get('href')
# 获取元素的文本
res = soup.select_one('a[href]').text

Selenium自动化

1.基础操作

from selenium import webdriver
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By

if __name__ == '__main__':
    # 谷歌浏览器驱动
    chromedriver_path = 'chromedriver72.exe'
    options = webdriver.ChromeOptions()
    # 躲避部分网站selenium检测
    options.add_experimental_option('excludeSwitches', ['enable-automation'])
    options.add_experimental_option("useAutomationExtension", False)

    driver = webdriver.Chrome(executable_path=chromedriver_path, options=options)

    # 躲避部分网站selenium检测
    script = "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
    driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": script})

    # 浏览器最大化
    driver.maximize_window()

    url = 'https://www.python.org/'
    driver.get(url)
    # 显式等待
    wait = WebDriverWait(driver, 20, 1)

    # 在主页输入框搜索requests,并点击搜索
    input_ = wait.until(EC.presence_of_element_located((By.ID, 'id-search-field')))
    input_.send_keys('requests')
    time.sleep(1)
    btn = driver.find_element_by_xpath('//button[@title="Submit this Search"]')
    btn.click()
    time.sleep(10)
    driver.close()

2.元素定位

查找单个元素

最常用的定位元素的两个方法是通过Xpath和id来定位。

  • find_element_by_id
  • find_element_by_xpath

查找多个元素

  • find_elements_by_xpath
  • find_elements_by_name
# 通过xpath查找元素
driver.find_element_by_xpath('//button[@title="Submit this Search"]')
# 通过id查找元素
driver.find_element_by_id('id-search-field')

获取元素内部内容

text = driver.find_element_by_xpath('//*[@id="ISDCaptcha"]/div[2]/div').get_attribute('innerHTML')
if '请绘制图中手势' in text:
    print('出现行为认证')

获取元素指定属性的属性值

driver.find_element_by_xpath('//div[@id="find-step3-phone"]').get_attribute('style')
driver.find_element_by_xpath('//*[@id="imgVerifyCodeP"]').get_attribute('src')

切换到指定iframe

# 通过id或者名称
driver.switch_to.frame("iframeLoginIfm")

driver.switch_to.frame(0)

frame = driver.find_element_by_xpath('//div[@id="loginDiv"]/iframe')
driver.switch_to.frame(frame)

切换到指定窗口

driver.switch_to.window(browser.window_handles[1])

切换到alert弹窗

text = driver.switch_to.alert.text
if '图片验证码输入错误' in text:
    print('图片验证码识别错误')
    driver.switch_to.alert.accept()

3.元素交互

按钮点击

btn = driver.find_element_by_xpath('//div[@role="button"]/div/span/span')
btn.click()

执行js代码

style_ = driver.find_element_by_xpath('//*[@id="passport-login-pop"]').get_attribute('style')
style_ = style_.replace('display: none;', '')
if not style_:
    style_ = 'left: 259px; top: 212px; z-index: 60001;'
js = 'document.getElementById("passport-login-pop").setAttribute("style","{}");'.format(style_)
driver.execute_script(js)

表单输入

input_ = driver.find_element_by_xpath('//input[@name="session[password]" and @dir="auto"]')
input_.send_keys('123qwe')

from selenium.webdriver.common.keys import Keys
input_.send_keys(Keys.BACK_SPACE)

页面滚动

driver.execute_script("""
                (function () {
                    var y = document.body.scrollTop;
                    var step = 100;
                    window.scroll(0, y);
                    function f() {
                        if (y < document.body.scrollHeight) {
                            y += step;
                            window.scroll(0, y);
                            setTimeout(f, 50);
                        }
                        else {
                            window.scroll(0, y);
                            document.title += "scroll-done";
                        }
                    }
                    setTimeout(f, 1000);
                })();
                """)

模拟拖动

from selenium.webdriver.common.action_chains import ActionChains

def get_track(self, distance):
    track = []
    current = 0
    mid = distance * 3 / 4
    t = 0.2
    v = 0
    while current < distance:
        if current < mid:
            a = 2
        else:
            a = -3
        v0 = v
        v = v0 + a * t
        move = v0 * t + 1 / 2 * a * t * t
        current += move
        track.append(round(move))
    return track

# 模拟拖动
btn = wait.until(EC.presence_of_element_located((By.XPATH, xpath_)))
track = get_track(500)
action = ActionChains(browser)
action.click_and_hold(btn).perform()
action.reset_actions()
for i in track:
    action.move_by_offset(xoffset=i, yoffset=0).perform()
    action.reset_actions()

4.等待

显式等待

# 显式等待
wait = WebDriverWait(driver, 20, 1)

input_ = wait.until(EC.presence_of_element_located((By.ID, 'id-search-field')))
input_.send_keys('requests')
time.sleep(1)

隐式等待

from selenium import webdriver

driver = webdriver.Chrome()
# 隐式等待
driver.implicitly_wait(10)
driver.get('https://www.zhihu.com/explore')
logo = driver.find_element_by_id('zh-top-link-logo')
print(logo)
driver.close()

5.其他操作

解决页面加载时间过长问题

有时候页面有些静态文件加载比较耗时,selenium可以不需要等待页面全部加载完全在去查找元素

options = webdriver.ChromeOptions()
# 解决页面加载阻塞问题
options.set_capability('pageLoadStrategy', 'none')
driver = webdriver.Chrome(executable_path=self.chromedriver_path, options=options)

添加请求头

options.add_argument("user-agent={}".format('Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36'))

添加代理

socks5 = "socks5://{}:{}".format(socks5_proxy_ip, socks5_proxy_port)
options.add_argument("--proxy-server={}".format(socks5))

捕捉F12控制台中所有请求记录

from selenium.webdriver.common.desired_capabilities import DesiredCapabilities

d = DesiredCapabilities.CHROME
d['loggingPrefs'] = {'performance': 'ALL'}
d['goog:chromeOptions'] = {
    'perfLoggingPrefs': {
        'enableNetwork': True,
    },
    'w3c': False,
}
options.add_experimental_option('perfLoggingPrefs', {'enableNetwork': True})
options.add_experimental_option('w3c', False)
driver = webdriver.Chrome(executable_path=self.chromedriver_path, options=options, desired_capabilities=d)

# 保存log
log_list = []
for entry in driver.get_log('performance'):
    log_list.append(entry)

屏幕截图,可以截取图片验证码加以识别

import win32con
import win32gui
import win32print
from win32api import GetSystemMetrics
from PIL import Image

def get_real_resolution():
    """获取真实的分辨率"""
    hDC = win32gui.GetDC(0)
    # 横向分辨率
    w = win32print.GetDeviceCaps(hDC, win32con.DESKTOPHORZRES)
    # 纵向分辨率
    h = win32print.GetDeviceCaps(hDC, win32con.DESKTOPVERTRES)
    return w, h

def get_screen_size():
    """获取缩放后的分辨率"""
    w = GetSystemMetrics(0)
    h = GetSystemMetrics(1)
    return w, h

real_resolution = get_real_resolution()
screen_size = get_screen_size()
screen_scale_rate = round(real_resolution[0] / screen_size[0], 2)

pic_name = '***.png'
driver.save_screenshot(pic_name)
# 找到图片验证码元素
element = driver.find_element_by_xpath(xpath_)
left = element.location['x'] * screen_scale_rate
top = element.location['y'] * screen_scale_rate
right = (element.location['x'] + element.size['width']) * screen_scale_rate
bottom = (element.location['y'] + element.size['height']) * screen_scale_rate
im = Image.open(pic_name)
# 裁剪图片
im = im.crop((left, top, right, bottom))
im.save(pic_name)
# 把图片转成base64,利用打码平台接口识别
with open(pic_name, 'rb') as f:
    code_img_base64 = base64.b64encode(f.read()).decode()
os.remove(pic_name)

xpath解析

  • // :从匹配选择的当前节点选择文档中的节点,而不考虑它们的位置。
  • / : 匹配当前目录下的直接子节点。
  • .. : 匹配当前节点的父节点。
  • @:选取属性。
  • //* : 选取文档中所有元素
text = """
            <?xml version="1.0" encoding="UTF-8"?>
            <bookstore>
            <book>
              <title lang="eng">Harry Potter</title>
              <price>29.99</price>
            </book>
            <book>
              <title lang="cn">Learning XML</title>
              <price>39.95</price>
              <aa lang="cn eng aa bb" name="cc">Learning XML</aa>
            </book>
            </bookstore>
"""
from lxml import etree
html = etree.HTML(text)
# print(etree.tostring(html).decode('utf-8'))

# 选取所有指定的节点
res = html.xpath('//book')

# 获取指定节点的所有直接子节点
res = html.xpath('//book/aa')

# 获取指定节点的父节点
res = html.xpath("//aa/..")

# 通过属性匹配选择节点
res = html.xpath('//title[@lang="cn"]')

# 获取文本值
res = html.xpath('//title[@lang="cn"]/text()')
res = html.xpath('//price/text()')

# 获取属性值 ['eng', 'cn']
res = html.xpath('//title/@lang')

# 属性多值匹配
res = html.xpath('//aa[contains(@lang,"aa")]')
# 对于属性值有多个的节点,不用contains函数的话,匹配到的是空[]
res = html.xpath('//aa[@lang="aa"]')

# 文本匹配
res = html.xpath('//title[contains(text(), "XML")]')

# 运算符
res = html.xpath('//aa[contains(@lang,"aa") and @name="cc"]')

Python爬虫—代理池维护

大致思路

  1. 去代理网站上爬取大量代理IP,并将其存储在redis数据库。
  2. 定时获取redis中的所有代理IP,检测每一个代理IP是否可用。
  3. 通过flask,对外提供获取代理IP的接口,如果想要使用代理池中的代理IP,只需要访问我们提供的接口即可。
    现在网上免费代理IP网站越来越少,而且免费的代理质量非常不稳定,本文只是提供构建代理IP池的一种思路,实战的话还是要想办法获取优质的代理。

代理池系统具体实现思路

代理池完整代码

agent_pool.py 整体流程

存储模块:主要实现的功能是,去一些免费代理网站爬取大量的代理IP,并存储至redis数据库中。redis的Sorted Set结构是一个有序集合,我们会对每一个爬取到的代理IP
设置一个初始化的优先级10,Sorted Set也是通过这个优先级来进行排序的。</br>

  • Getter:爬取代理网站的免费代理IP,存入redis
  • Tester:从redis中取出代理,测试代理是否可用,并调整代理IP的优先级
  • Controller:启动Getter()与Tester()
from Crawler import Crawler
from RedisClient import RedisClient
import traceback
import time
import requests
import multiprocessing
from concurrent import futures

FULL_COUNT = 2000

class Getter(object):
    # 爬取代理网站的免费代理IP,存入redis
    def __init__(self):
        self.redis_client = RedisClient()
        self.crawler = Crawler()

    def is_full(self):
        # 判断代理池是否满了
        return self.redis_client.get_proxy_count() >= FULL_COUNT

    def run(self):
        # 将爬取到的代理存入redis
        if not self.is_full():
            proxys = self.crawler.get_crawler_proxy()
            for proxy in proxys:
                self.redis_client.add(proxy)

class Tester(object):
    # 从redis中取出代理,测试代理是否可用,并调整代理IP的优先级
    def __init__(self, test_url):
        self.redisdb = RedisClient()
        # 用来测试代理是否可用的地址
        self.test_url = test_url

    def test_proxy(self, proxy):
        try:
            if isinstance(proxy, bytes):
                proxy = proxy.decode('utf-8')
            proxies = {
                'http': 'http://' + proxy,
                'https': 'https://' + proxy
            }
            print('正在检测:{}'.format(proxy))
            res = requests.get(self.test_url, proxies=proxies, timeout=10)
            if res.status_code == 200:
                return True, proxy
            else:
                return False, proxy
                # 代理不可用,就降低其优先级
        except Exception as e:
            return False, proxy
            # print('代理检测异常:{}  {}'.format(proxy, e))
            self.redisdb.decrease(proxy)
            print('代理不可用:{}'.format(proxy))


    def run(self):
        print('启动检测模块......')
        try:
            # 获取redis中所有爬取到的代理
            proxies = self.redisdb.get_all_proxy()
            for i in range(0, len(proxies), 50):
                test_proxies = proxies[i:i+50]
                workers = len(test_proxies)
                with futures.ThreadPoolExecutor(workers) as executor:
                    tasks_res = executor.map(self.test_proxy, test_proxies)
                    for res, proxy in tasks_res:
                        if not res:
                            # 代理不可用,就降低其优先级
                            self.redisdb.decrease(proxy)
                            print('代理不可用:{}'.format(proxy))
                        else:
                            # 代理可用,将其优先级置为最大
                            self.redisdb.max(proxy)
                            print('代理可用:{}'.format(proxy))

        except Exception as e:
            print(traceback.format_exc())
            print('检测模块出错!!!')

class Controller(object):
    def control_get(self):
        # 获取功能:爬取代理网站,将代理存储到redis
        getter = Getter()
        while True:
            try:
                getter.run()
            except:
                print(traceback.format_exc())
            time.sleep(30)

    def control_test(self):
        # 检测功能,检测redis中的代理是否可用
        tester = Tester(test_url='http://www.baidu.com')
        while True:
            try:
                tester.run()
            except:
                print(traceback.format_exc())
            time.sleep(30)

    def run(self):
        print('代理池开始运行了......')
        # 两个进程
        get = multiprocessing.Process(target=self.control_get)
        get.start()
        test = multiprocessing.Process(target=self.control_test)
        test.start()

if __name__ == '__main__':
    control = Controller()
    control.run()

WebAPI_to_get_proxy.py 通过flask向外提供获取代理IP的接口

from flask import Flask, g
import RedisClient

"""
    对外提供web接口,通过提供的web接口,来获取redis中的代理
    g是上下文对象,处理请求时,用于临时存储的对象,每次请求都会重设这个变量。比如:我们可以获取一些临时请求的用户信息。
"""


app = Flask(__name__)


@app.route('/')
def index():
    return '<h2>欢迎来到daacheng代理池系统</h2>'


def get():
    if not hasattr(g, 'redis'):
        g.redis = RedisClient.RedisClient()
    return g.redis


@app.route('/random')
def get_random_proxy():
    # 从代理池中返回一个代理
    redisdb = get()
    return redisdb.get_proxy()


@app.route('/count')
def count():
    # 查询代理池中代理的个数
    redisdb = get()
    return str(redisdb.get_proxy_count())


@app.route('/all')
def get_all():
    # 查询代理池中代理的个数
    redisdb = get()
    return str(redisdb.get_all_proxy())


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)
    app.debug = True