import requests, urllib
class Requestsutils:
def __init__(self, ip:str=None, enableProxy:bool=False) -> None:
proxy = f"http://{ip}"
self.proxies = {"http": proxy,"https": proxy} if enableProxy else urllib.request.getproxies()
def send_requsest(self, method:str, url:str, **kwargs):
return requests.request(method, url,
**kwargs,
proxies=self.proxies
)
该方法主要针对本地电脑开启代理后需要手动设置代理ip的通点。如果需要设置远程代理则在实例化对象传入远程ip+端口然后将enableProxy设置为True即可。
import aiohttp
from functools import wraps
from asyncio.proactor_events import _ProactorBasePipeTransport
def silence_event_loop_closed(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except RuntimeError as e:
if str(e) != 'Event loop is closed':
raise
return wrapper
_ProactorBasePipeTransport.__del__ = silence_event_loop_closed(_ProactorBasePipeTransport.__del__)
class AioHttpClient:
def __init__(self, ip:str=None, enableProxy:bool=False) -> None:
self.session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False), trust_env=True)
self.proxy = f"http://{ip}"
self.enableProxy = enableProxy
async def send_request(self, method, url, **kwargs):
return await self.session.request(method, url, proxy=self.proxy if self.enableProxy else None, **kwargs)
async def close_session(self):
await self.session.close()
import asyncio
async def main():
client = AioHttpClient()
data = await client.send_request("GET", url)
await client.close_session()
print(await data.text())
if __name__ == "__main__":
# 同步
url = "http://tanblog.cc"
print(Requestsutils().send_requsest("GET", url).text)
# 异步
asyncio.run(main())