四川成都營銷型網(wǎng)站數(shù)據(jù)分析網(wǎng)站
通過requests.session().request 封裝request方法
考慮到請(qǐng)求HTTP/2.0
同時(shí)封裝httpx 來處理HTTP/2.0的請(qǐng)求
封裝requests
# 遇到請(qǐng)求失敗的情況時(shí) 重新請(qǐng)求,請(qǐng)求5次等待2s
@retry(stop_max_attempt_number=5, retry_on_result=lambda re_data: re_data is None, wait_fixed=2000)def requests_request(self, method, url, params=None, data=None, json=None, headers=None, files=None, verify=False,cert=None, timeout=None, proxies=None, proxy=None, **kwargs):# 對(duì)異常進(jìn)行捕獲try:"""封裝request請(qǐng)求,將請(qǐng)求方法、請(qǐng)求地址,請(qǐng)求參數(shù)、請(qǐng)求頭等信息入?yún)?。?:verify: True/False,默認(rèn)為True,認(rèn)證SSL證書開關(guān);cert: 本地SSL證書。如果不需要ssl認(rèn)證,可將這兩個(gè)入?yún)⑷サ羰褂胹ession管理器requests.session(): 維持會(huì)話,跨請(qǐng)求的時(shí)候保存參數(shù) """# 處理代理proxies = Noneif proxy:proxies = {'http://': 'http://' + proxy,'https://': 'https://' + proxy,}# 使用requests.session().request 請(qǐng)求re_data = requests.session().request(method, url, params=params, data=data, json=json, headers=headers,files=files, cert=cert, timeout=timeout, verify=verify,proxies=proxies, **kwargs)# 異常處理 報(bào)錯(cuò)顯示具體信息except Exception as e:re_data = None# 打印異常print("請(qǐng)求失敗:{0}".format(e))logger.error("Error occurred: %s", str(e), exc_info=True)# 重新拋出異常,觸發(fā) retry 機(jī)制raise e# 返回響應(yīng)結(jié)果return re_data
封裝httpx
@retry(stop_max_attempt_number=5, retry_on_result=lambda re_data: re_data is None, wait_fixed=2000)def httpx_request(self, method, url, is_http2=False, content=None, data=None, files=None, json=None, params=None,headers=None, cookies=None, timeout=None, extensions=None, proxy=None, **kwargs):# 對(duì)異常進(jìn)行捕獲try:"""使用client method.upper() 請(qǐng)求方法都轉(zhuǎn)為大寫"""# 處理代理proxies = Noneif proxy:proxies = {'http://': 'http://' + proxy,'https://': 'https://' + proxy,}re_data = httpx.Client(http2=is_http2, proxies=proxies).request(method.upper(), url, content=content,data=data, files=files, json=json,params=params, headers=headers,cookies=cookies, timeout=timeout,extensions=extensions, **kwargs)# 異常處理 報(bào)錯(cuò)顯示具體信息except Exception as e:re_data = None# 打印異常print("請(qǐng)求失敗:{0}".format(e))logger.error("Error occurred: %s", str(e), exc_info=True)# 重新拋出異常,觸發(fā) retry 機(jī)制raise e# 返回響應(yīng)結(jié)果return re_data
將兩個(gè)請(qǐng)求封裝在一個(gè)方法里
@retry(stop_max_attempt_number=5, retry_on_result=lambda re_data: re_data is None, wait_fixed=2000)def request(self, method, url, is_http2=False, params=None, data=None, json=None, headers=None, files=None,verify=False, cert=None, timeout=None, proxies=None, content=None, cookies=None, extensions=None,**kwargs):try:if is_http2:re_data = self.httpx_request(method=method.upper(), url=url, is_http2=is_http2, content=content,data=data, files=files, json=json, params=params, headers=headers,cookies=cookies, timeout=timeout, extensions=extensions, **kwargs)else:re_data = self.requests_request(method=method, url=url, params=params, data=data, json=json,headers=headers, files=files, cert=cert, timeout=timeout, verify=verify,proxies=proxies, **kwargs)# 異常處理 報(bào)錯(cuò)顯示具體信息except Exception as e:re_data = None# 打印異常print("請(qǐng)求失敗:{0}".format(e))logger.error("Error occurred: %s", str(e), exc_info=True)# 重新拋出異常,觸發(fā) retry 機(jī)制raise e# 返回響應(yīng)結(jié)果return re_data
通過is_http2來區(qū)分
測試代碼如下
if __name__ == '__main__':# request_requests 使用requests請(qǐng)求request_data = request_main.requests_request("get", 'https://spa16.scrape.center/')if request_data:print(request_data.text)print(request_data.status_code)# httpx 請(qǐng)求HTTP/2.0# response = re.httpx_request('GET', 'https://spa16.scrape.center/', True)# httpx 一般請(qǐng)求# headers = {'User-Agent': 'my-app/0.0.1'}# response = re.httpx_request('get', 'https://www.httpbin.org/get',params={'name': 'germey'})# print(response.text)# print(response.status_code)print(datetime.datetime.now())