前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Django Rest Framework(认证、权限、限制访问频率)

Django Rest Framework(认证、权限、限制访问频率)

作者头像
用户1214487
发布2022-03-26 14:20:15
2.7K0
发布2022-03-26 14:20:15
举报
文章被收录于专栏:Python

一、认证和授权

a. 用户url传入的token认证

代码语言:javascript
复制
from django.conf.urls import url, include
from web.viewsimport TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]
代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework.permissions import BasePermission

from rest_framework.request import Request
from rest_framework import exceptions

token_list = [
    'sfsfss123kuf3j123',
    'asijnfowerkkf9812',
]


class TestAuthentication(BaseAuthentication):
    def authenticate(self, request):
        """
        用户认证,如果验证成功后返回元组: (用户,用户Token)
        :param request: 
        :return: 
            None,表示跳过该验证;
                如果跳过了所有认证,默认用户和Token和使用配置文件进行设置
                self._authenticator = None
                if api_settings.UNAUTHENTICATED_USER:
                    self.user = api_settings.UNAUTHENTICATED_USER() # 默认值为:匿名用户
                else:
                    self.user = None
        
                if api_settings.UNAUTHENTICATED_TOKEN:
                    self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默认值为:None
                else:
                    self.auth = None
            (user,token)表示验证通过并设置用户名和Token;
            AuthenticationFailed异常
        """
        val = request.query_params.get('token')
        if val not in token_list:
            raise exceptions.AuthenticationFailed("用户认证失败")

        return ('登录用户', '用户token')

    def authenticate_header(self, request):
        """
        Return a string to be used as the value of the `WWW-Authenticate`
        header in a `401 Unauthenticated` response, or `None` if the
        authentication scheme should return `403 Permission Denied` responses.
        """
         # 验证失败时,返回的响应头WWW-Authenticate对应的值
        pass




class TestView(APIView):
    # 认证的动作是由request.user触发
    authentication_classes = [TestAuthentication, ]

    # 权限
    # 循环执行所有的权限
    permission_classes = [TestPermission, ]

    def get(self, request, *args, **kwargs):
        # self.dispatch
        print(request.user)
        print(request.auth)
        return Response('GET请求,响应内容')

    def post(self, request, *args, **kwargs):
        return Response('POST请求,响应内容')

    def put(self, request, *args, **kwargs):
        return Response('PUT请求,响应内容')

views.py

b. 请求头认证

代码语言:javascript
复制
from django.conf.urls import url, include
from web.viewsimport TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]
代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework.request import Request
from rest_framework import exceptions

token_list = [
    'sfsfss123kuf3j123',
    'asijnfowerkkf9812',
]


class TestAuthentication(BaseAuthentication):
    def authenticate(self, request):
        """
        用户认证,如果验证成功后返回元组: (用户,用户Token)
        :param request: 
        :return: 
            None,表示跳过该验证;
                如果跳过了所有认证,默认用户和Token和使用配置文件进行设置
                self._authenticator = None
                if api_settings.UNAUTHENTICATED_USER:
                    self.user = api_settings.UNAUTHENTICATED_USER()
                else:
                    self.user = None
        
                if api_settings.UNAUTHENTICATED_TOKEN:
                    self.auth = api_settings.UNAUTHENTICATED_TOKEN()
                else:
                    self.auth = None
            (user,token)表示验证通过并设置用户名和Token;
            AuthenticationFailed异常
        """
        import base64
        auth = request.META.get('HTTP_AUTHORIZATION', b'')
        if auth:
            auth = auth.encode('utf-8')
        auth = auth.split()
        if not auth or auth[0].lower() != b'basic':
            raise exceptions.AuthenticationFailed('验证失败')
        if len(auth) != 2:
            raise exceptions.AuthenticationFailed('验证失败')
        username, part, password = base64.b64decode(auth[1]).decode('utf-8').partition(':')
        if username == 'alex' and password == '123':
            return ('登录用户', '用户token')
        else:
            raise exceptions.AuthenticationFailed('用户名或密码错误')

    def authenticate_header(self, request):
        """
        Return a string to be used as the value of the `WWW-Authenticate`
        header in a `401 Unauthenticated` response, or `None` if the
        authentication scheme should return `403 Permission Denied` responses.
        """
        return 'Basic realm=api'


class TestView(APIView):
    authentication_classes = [TestAuthentication, ]
    permission_classes = []

    def get(self, request, *args, **kwargs):
        print(request.user)
        print(request.auth)
        return Response('GET请求,响应内容')

    def post(self, request, *args, **kwargs):
        return Response('POST请求,响应内容')

    def put(self, request, *args, **kwargs):
        return Response('PUT请求,响应内容')

c. 多个认证规则

代码语言:javascript
复制
from django.conf.urls import url, include
from web.views.s2_auth import TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]
代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework.request import Request
from rest_framework import exceptions

token_list = [
    'sfsfss123kuf3j123',
    'asijnfowerkkf9812',
]


class Test1Authentication(BaseAuthentication):
    def authenticate(self, request):
        """
        用户认证,如果验证成功后返回元组: (用户,用户Token)
        :param request: 
        :return: 
            None,表示跳过该验证;
                如果跳过了所有认证,默认用户和Token和使用配置文件进行设置
                self._authenticator = None
                if api_settings.UNAUTHENTICATED_USER:
                    self.user = api_settings.UNAUTHENTICATED_USER() # 默认值为:匿名用户
                else:
                    self.user = None

                if api_settings.UNAUTHENTICATED_TOKEN:
                    self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默认值为:None
                else:
                    self.auth = None
            (user,token)表示验证通过并设置用户名和Token;
            AuthenticationFailed异常
        """
        import base64
        auth = request.META.get('HTTP_AUTHORIZATION', b'')
        if auth:
            auth = auth.encode('utf-8')
        else:
            return None
        print(auth,'xxxx')
        auth = auth.split()
        if not auth or auth[0].lower() != b'basic':
            raise exceptions.AuthenticationFailed('验证失败')
        if len(auth) != 2:
            raise exceptions.AuthenticationFailed('验证失败')
        username, part, password = base64.b64decode(auth[1]).decode('utf-8').partition(':')
        if username == 'alex' and password == '123':
            return ('登录用户', '用户token')
        else:
            raise exceptions.AuthenticationFailed('用户名或密码错误')

    def authenticate_header(self, request):
        """
        Return a string to be used as the value of the `WWW-Authenticate`
        header in a `401 Unauthenticated` response, or `None` if the
        authentication scheme should return `403 Permission Denied` responses.
        """
        # return 'Basic realm=api'
        pass

class Test2Authentication(BaseAuthentication):
    def authenticate(self, request):
        """
        用户认证,如果验证成功后返回元组: (用户,用户Token)
        :param request: 
        :return: 
            None,表示跳过该验证;
                如果跳过了所有认证,默认用户和Token和使用配置文件进行设置
                self._authenticator = None
                if api_settings.UNAUTHENTICATED_USER:
                    self.user = api_settings.UNAUTHENTICATED_USER() # 默认值为:匿名用户
                else:
                    self.user = None
        
                if api_settings.UNAUTHENTICATED_TOKEN:
                    self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默认值为:None
                else:
                    self.auth = None
            (user,token)表示验证通过并设置用户名和Token;
            AuthenticationFailed异常
        """
        val = request.query_params.get('token')
        if val not in token_list:
            raise exceptions.AuthenticationFailed("用户认证失败")

        return ('登录用户', '用户token')

    def authenticate_header(self, request):
        """
        Return a string to be used as the value of the `WWW-Authenticate`
        header in a `401 Unauthenticated` response, or `None` if the
        authentication scheme should return `403 Permission Denied` responses.
        """
        pass


class TestView(APIView):
    authentication_classes = [Test1Authentication, Test2Authentication]
    permission_classes = []

    def get(self, request, *args, **kwargs):
        print(request.user)
        print(request.auth)
        return Response('GET请求,响应内容')

    def post(self, request, *args, **kwargs):
        return Response('POST请求,响应内容')

    def put(self, request, *args, **kwargs):
        return Response('PUT请求,响应内容')

views.py

d. 认证和权限

代码语言:javascript
复制
from django.conf.urls import url, include
from web.views import TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]
代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework.permissions import BasePermission

from rest_framework.request import Request
from rest_framework import exceptions

token_list = [
    'sfsfss123kuf3j123',
    'asijnfowerkkf9812',
]


class TestAuthentication(BaseAuthentication):
    def authenticate(self, request):
        """
        用户认证,如果验证成功后返回元组: (用户,用户Token)
        :param request: 
        :return: 
            None,表示跳过该验证;
                如果跳过了所有认证,默认用户和Token和使用配置文件进行设置
                self._authenticator = None
                if api_settings.UNAUTHENTICATED_USER:
                    self.user = api_settings.UNAUTHENTICATED_USER() # 默认值为:匿名用户
                else:
                    self.user = None
        
                if api_settings.UNAUTHENTICATED_TOKEN:
                    self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默认值为:None
                else:
                    self.auth = None
            (user,token)表示验证通过并设置用户名和Token;
            AuthenticationFailed异常
        """
        val = request.query_params.get('token')
        if val not in token_list:
            raise exceptions.AuthenticationFailed("用户认证失败")

        return ('登录用户', '用户token')

    def authenticate_header(self, request):
        """
        Return a string to be used as the value of the `WWW-Authenticate`
        header in a `401 Unauthenticated` response, or `None` if the
        authentication scheme should return `403 Permission Denied` responses.
        """
        pass


class TestPermission(BasePermission):
    message = "权限验证失败"

    def has_permission(self, request, view):
        """
        判断是否有权限访问当前请求
        Return `True` if permission is granted, `False` otherwise.
        :param request: 
        :param view: 
        :return: True有权限;False无权限
        """
        if request.user == "管理员":
            return True

    # GenericAPIView中get_object时调用
    def has_object_permission(self, request, view, obj):
        """
        视图继承GenericAPIView,并在其中使用get_object时获取对象时,触发单独对象权限验证
        Return `True` if permission is granted, `False` otherwise.
        :param request: 
        :param view: 
        :param obj: 
        :return: True有权限;False无权限
        """
        if request.user == "管理员":
            return True


class TestView(APIView):
    # 认证的动作是由request.user触发
    authentication_classes = [TestAuthentication, ]

    # 权限
    # 循环执行所有的权限
    permission_classes = [TestPermission, ]

    def get(self, request, *args, **kwargs):
        # self.dispatch
        print(request.user)
        print(request.auth)
        return Response('GET请求,响应内容')

    def post(self, request, *args, **kwargs):
        return Response('POST请求,响应内容')

    def put(self, request, *args, **kwargs):
        return Response('PUT请求,响应内容')

e. 全局使用

上述操作中均是对单独视图进行特殊配置,如果想要对全局进行配置,则需要再配置文件中写入即可。

代码语言:javascript
复制
REST_FRAMEWORK = {
    'UNAUTHENTICATED_USER': None,
    'UNAUTHENTICATED_TOKEN': None,
    "DEFAULT_AUTHENTICATION_CLASSES": [
        "web.utils.TestAuthentication",
    ],
    "DEFAULT_PERMISSION_CLASSES": [
        "web.utils.TestPermission",
    ],
}
代码语言:javascript
复制
from django.conf.urls import url, include
from web.views import TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]

urls.py
代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response

class TestView(APIView):

    def get(self, request, *args, **kwargs):
        # self.dispatch
        print(request.user)
        print(request.auth)
        return Response('GET请求,响应内容')

    def post(self, request, *args, **kwargs):
        return Response('POST请求,响应内容')

    def put(self, request, *args, **kwargs):
        return Response('PUT请求,响应内容')

g.自定义认证工作

代码语言:javascript
复制
 1 #
 2 class MyAuthtication(BasicAuthentication):
 3     def authenticate(self, request):
 4         token = request.query_params.get('token')  #注意是没有GET的,用query_params表示
 5         if token == 'zxxzzxzc':
 6             return ('uuuuuu','afsdsgdf') #返回user,auth
 7         # raise AuthenticationFailed('认证错误')  #只要抛出认证错误这样的异常就会去执行下面的函数
 8         raise APIException('认证错误')
 9     def authenticate_header(self, request):  #认证不成功的时候执行
10         return 'Basic reala="api"'
11 
12 class UserView(APIView):
13     authentication_classes = [MyAuthtication,]
14     def get(self,request,*args,**kwargs):
15         print(request.user)
16         print(request.auth)
17         return Response('用户列表')

二、权限

1、需求:Host是匿名用户和用户都能访问 #匿名用户的request.user = none;User只有注册用户能访问

代码语言:javascript
复制
1 from app03 import views
2 from django.conf.urls import url
3 urlpatterns = [
4     # django rest framework
5     url('^auth/', views.AuthView.as_view()),
6     url(r'^hosts/', views.HostView.as_view()),
7     url(r'^users/', views.UsersView.as_view()),
8     url(r'^salary/', views.SalaryView.as_view()),
9 ]
代码语言:javascript
复制
 1 from django.shortcuts import render
 2 from rest_framework.views import APIView  #继承的view
 3 from rest_framework.response import  Response #友好的返回
 4 from rest_framework.authentication import BaseAuthentication   #认证的类
 5 from rest_framework.authentication import BasicAuthentication
 6 from app01 import models
 7 from rest_framework import  exceptions
 8 from rest_framework.permissions import AllowAny   #权限在这个类里面
 9 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle
10 # Create your views here.
11 # +++++++++++++++认证类和权限类========================
12 class MyAuthentication(BaseAuthentication):
13     def authenticate(self, request):
14         token = request.query_params.get('token')
15         obj = models.UserInfo.objects.filter(token=token).first()
16         if obj :  #如果认证成功,返回用户名和auth
17             return (obj.username,obj)
18         return None  #如果没有认证成功就不处理,进行下一步
19 
20     def authenticate_header(self, request):
21         pass
22 
23 class MyPermission(object):
24     message = '无权访问'
25     def has_permission(self,request,view):  #has_permission里面的self是view视图对象
26         if request.user:
27             return True  #如果不是匿名用户就说明有权限
28         return False  #否则无权限
29 
30 class AdminPermission(object):
31     message = '无权访问'
32     def has_permission(self, request, view):  # has_permission里面的self是view视图对象
33         if request.user=='haiyun':
34             return True  # 返回True表示有权限
35         return False #返回False表示无权限
36 
37 # +++++++++++++++++++++++++++
38 class AuthView(APIView):
39     authentication_classes = []  #认证页面不需要认证
40 
41     def get(self,request):
42         self.dispatch
43         return '认证列表'
44 
45 class HostView(APIView):
46     '''需求:
47           Host是匿名用户和用户都能访问  #匿名用户的request.user = none
48           User只有注册用户能访问
49     '''
50     authentication_classes = [MyAuthentication,]
51     permission_classes = []  #都能访问就没必要设置权限了
52     def get(self,request):
53         print(request.user)
54         print(request.auth)
55         return Response('主机列表')
56 
57 class UsersView(APIView):
58     '''用户能访问,request.user里面有值'''
59     authentication_classes = [MyAuthentication,]
60     permission_classes = [MyPermission,]
61     def get(self,request):
62         print(request.user,'111111111')
63         return Response('用户列表')
64 
65     def permission_denied(self, request, message=None):
66         """
67         If request is not permitted, determine what kind of exception to raise.
68         """
69         if request.authenticators and not request.successful_authenticator:
70             '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
71             raise exceptions.NotAuthenticated(detail='无权访问')
72         raise exceptions.PermissionDenied(detail=message)
代码语言:javascript
复制
 1 class SalaryView(APIView):
 2     '''用户能访问'''
 3     message ='无权访问'
 4     authentication_classes = [MyAuthentication,]  #验证是不是用户
 5     permission_classes = [MyPermission,AdminPermission,] #再看用户有没有权限,如果有权限在判断有没有管理员的权限
 6     def get(self,request):
 7         return Response('薪资列表')
 8 
 9     def permission_denied(self, request, message=None):
10         """
11         If request is not permitted, determine what kind of exception to raise.
12         """
13         if request.authenticators and not request.successful_authenticator:
14             '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
15             raise exceptions.NotAuthenticated(detail='无权访问')
16         raise exceptions.PermissionDenied(detail=message)

如果遇上下面这样的情况,是因为没有通过认证,并且权限中return False了,可以自定制错误信息为中文,参考源码

代码语言:javascript
复制
    def check_permissions(self, request):
        """
        Check if the request should be permitted.
        Raises an appropriate exception if the request is not permitted.
        """
        for permission in self.get_permissions():
            #循环每一个permission对象,调用has_permission
            #如果False,则抛出异常
            #True 说明有权访问
            if not permission.has_permission(request, self):
                self.permission_denied(
                    request, message=getattr(permission, 'message', None)
                )
代码语言:javascript
复制
    def permission_denied(self, request, message=None):
        """
        If request is not permitted, determine what kind of exception to raise.
        """
        if request.authenticators and not request.successful_authenticator:
            '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
            raise exceptions.NotAuthenticated()
        raise exceptions.PermissionDenied(detail=message)

那么我们可以重写permission_denied这个方法,如下:

代码语言:javascript
复制
 1 class UsersView(APIView):
 2     '''用户能访问,request.user里面有值'''
 3     authentication_classes = [MyAuthentication,]
 4     permission_classes = [MyPermission,]
 5     def get(self,request):
 6         return Response('用户列表')
 7 
 8     def permission_denied(self, request, message=None):
 9         """
10         If request is not permitted, determine what kind of exception to raise.
11         """
12         if request.authenticators and not request.successful_authenticator:
13             '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
14             raise exceptions.NotAuthenticated(detail='无权访问')
15         raise exceptions.PermissionDenied(detail=message)

2. 全局使用

上述操作中均是对单独视图进行特殊配置,如果想要对全局进行配置,则需要再配置文件中写入即可。

代码语言:javascript
复制
 1 REST_FRAMEWORK = {
 2     'UNAUTHENTICATED_USER': None,
 3     'UNAUTHENTICATED_TOKEN': None,  #将匿名用户设置为None
 4     "DEFAULT_AUTHENTICATION_CLASSES": [
 5         "app01.utils.MyAuthentication",
 6     ],
 7     'DEFAULT_PERMISSION_CLASSES':[
 8         "app03.utils.MyPermission",#设置路径,
 9     ]
10 }
代码语言:javascript
复制
 1 class AuthView(APIView):
 2     authentication_classes = []  #认证页面不需要认证
 3 
 4     def get(self,request):
 5         self.dispatch
 6         return '认证列表'
 7 
 8 class HostView(APIView):
 9     '''需求:
10           Host是匿名用户和用户都能访问  #匿名用户的request.user = none
11           User只有注册用户能访问
12     '''
13     authentication_classes = [MyAuthentication,]
14     permission_classes = []  #都能访问就没必要设置权限了
15     def get(self,request):
16         print(request.user)
17         print(request.auth)
18         return Response('主机列表')
19 
20 class UsersView(APIView):
21     '''用户能访问,request.user里面有值'''
22     authentication_classes = [MyAuthentication,]
23     permission_classes = [MyPermission,]
24     def get(self,request):
25         print(request.user,'111111111')
26         return Response('用户列表')
27 
28     def permission_denied(self, request, message=None):
29         """
30         If request is not permitted, determine what kind of exception to raise.
31         """
32         if request.authenticators and not request.successful_authenticator:
33             '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
34             raise exceptions.NotAuthenticated(detail='无权访问')
35         raise exceptions.PermissionDenied(detail=message)
36 
37 
38 class SalaryView(APIView):
39     '''用户能访问'''
40     message ='无权访问'
41     authentication_classes = [MyAuthentication,]  #验证是不是用户
42     permission_classes = [MyPermission,AdminPermission,] #再看用户有没有权限,如果有权限在判断有没有管理员的权限
43     def get(self,request):
44         return Response('薪资列表')
45 
46     def permission_denied(self, request, message=None):
47         """
48         If request is not permitted, determine what kind of exception to raise.
49         """
50         if request.authenticators and not request.successful_authenticator:
51             '''如果没有通过认证,并且权限中return False了,就会报下面的这个异常了'''
52             raise exceptions.NotAuthenticated(detail='无权访问')
53         raise exceptions.PermissionDenied(detail=message)

三、用户访问次数/频率限制

1、为什么要限流呢

答:

  • - 第一点:爬虫,反爬
  • - 第二点:控制 API 访问次数
    • - 登录用户的用户名可以做标识
    • 匿名用户可以参考 ip,但是 ip可以加代理。

2、限制访问频率源码分析

代码语言:javascript
复制
1  self.check_throttles(request)
代码语言:javascript
复制
 1     def check_throttles(self, request):
 2         """
 3         Check if request should be throttled.
 4         Raises an appropriate exception if the request is throttled.
 5         """
 6         for throttle in self.get_throttles():
 7             #循环每一个throttle对象,执行allow_request方法
 8             # allow_request:
 9                 #返回False,说明限制访问频率
10                 #返回True,说明不限制,通行
11             if not throttle.allow_request(request, self):
12                 self.throttled(request, throttle.wait())
13                 #throttle.wait()表示还要等多少秒就能访问了
代码语言:javascript
复制
1     def get_throttles(self):
2         """
3         Instantiates and returns the list of throttles that this view uses.
4         """
5         #返回对象
6         return [throttle() for throttle in self.throttle_classes]
代码语言:javascript
复制
1 throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
代码语言:javascript
复制
 1 class BaseThrottle(object):
 2     """
 3     Rate throttling of requests.
 4     """
 5 
 6     def allow_request(self, request, view):
 7         """
 8         Return `True` if the request should be allowed, `False` otherwise.
 9         """
10         raise NotImplementedError('.allow_request() must be overridden')
11 
12     def get_ident(self, request):
13         """
14         Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
15         if present and number of proxies is > 0. If not use all of
16         HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
17         """
18         xff = request.META.get('HTTP_X_FORWARDED_FOR')
19         remote_addr = request.META.get('REMOTE_ADDR')
20         num_proxies = api_settings.NUM_PROXIES
21 
22         if num_proxies is not None:
23             if num_proxies == 0 or xff is None:
24                 return remote_addr
25             addrs = xff.split(',')
26             client_addr = addrs[-min(num_proxies, len(addrs))]
27             return client_addr.strip()
28 
29         return ''.join(xff.split()) if xff else remote_addr
30 
31     def wait(self):
32         """
33         Optionally, return a recommended number of seconds to wait before
34         the next request.
35         """
36         return None
代码语言:javascript
复制
1 zz
代码语言:javascript
复制
1   def throttled(self, request, wait):
2         """
3         If request is throttled, determine what kind of exception to raise.
4         """
5         raise exceptions.Throttled(wait)
代码语言:javascript
复制
 1 class Throttled(APIException):
 2     status_code = status.HTTP_429_TOO_MANY_REQUESTS
 3     default_detail = _('Request was throttled.')
 4     extra_detail_singular = 'Expected available in {wait} second.'
 5     extra_detail_plural = 'Expected available in {wait} seconds.'
 6     default_code = 'throttled'
 7 
 8     def __init__(self, wait=None, detail=None, code=None):
 9         if detail is None:
10             detail = force_text(self.default_detail)
11         if wait is not None:
12             wait = math.ceil(wait)
13             detail = ' '.join((
14                 detail,
15                 force_text(ungettext(self.extra_detail_singular.format(wait=wait),
16                                      self.extra_detail_plural.format(wait=wait),
17                                      wait))))
18         self.wait = wait
19         super(Throttled, self).__init__(detail, code)

3.方法

a. 基于用户IP限制访问频率

代码语言:javascript
复制
from django.conf.urls import url, include
from web.views import TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]
代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from rest_framework.views import APIView
from rest_framework.response import Response

from rest_framework import exceptions
from rest_framework.throttling import BaseThrottle
from rest_framework.settings import api_settings

# 保存访问记录
RECORD = {
    '用户IP': [12312139, 12312135, 12312133, ]
}


class TestThrottle(BaseThrottle):
    ctime = time.time

    def get_ident(self, request):
        """
        根据用户IP和代理IP,当做请求者的唯一IP
        Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
        if present and number of proxies is > 0. If not use all of
        HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
        """
        xff = request.META.get('HTTP_X_FORWARDED_FOR')
        remote_addr = request.META.get('REMOTE_ADDR')
        num_proxies = api_settings.NUM_PROXIES

        if num_proxies is not None:
            if num_proxies == 0 or xff is None:
                return remote_addr
            addrs = xff.split(',')
            client_addr = addrs[-min(num_proxies, len(addrs))]
            return client_addr.strip()

        return ''.join(xff.split()) if xff else remote_addr

    def allow_request(self, request, view):
        """
        是否仍然在允许范围内
        Return `True` if the request should be allowed, `False` otherwise.
        :param request: 
        :param view: 
        :return: True,表示可以通过;False表示已超过限制,不允许访问
        """
        # 获取用户唯一标识(如:IP)

        # 允许一分钟访问10次
        num_request = 10
        time_request = 60

        now = self.ctime()
        ident = self.get_ident(request)
        self.ident = ident
        if ident not in RECORD:
            RECORD[ident] = [now, ]
            return True
        history = RECORD[ident]
        while history and history[-1] <= now - time_request:
            history.pop()
        if len(history) < num_request:
            history.insert(0, now)
            return True

    def wait(self):
        """
        多少秒后可以允许继续访问
        Optionally, return a recommended number of seconds to wait before
        the next request.
        """
        last_time = RECORD[self.ident][0]
        now = self.ctime()
        return int(60 + last_time - now)


class TestView(APIView):
    throttle_classes = [TestThrottle, ]

    def get(self, request, *args, **kwargs):
        # self.dispatch
        print(request.user)
        print(request.auth)
        return Response('GET请求,响应内容')

    def post(self, request, *args, **kwargs):
        return Response('POST请求,响应内容')

    def put(self, request, *args, **kwargs):
        return Response('PUT请求,响应内容')

    def throttled(self, request, wait):
        """
        访问次数被限制时,定制错误信息
        """

        class Throttled(exceptions.Throttled):
            default_detail = '请求被限制.'
            extra_detail_singular = '请 {wait} 秒之后再重试.'
            extra_detail_plural = '请 {wait} 秒之后再重试.'

        raise Throttled(wait)

b. 基于用户IP显示访问频率(利于Django缓存)

代码语言:javascript
复制
REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_RATES': {
        'test_scope': '10/m',
    },
}
代码语言:javascript
复制
from django.conf.urls import url, include
from web.views import TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]
代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response

from rest_framework import exceptions
from rest_framework.throttling import SimpleRateThrottle


class TestThrottle(SimpleRateThrottle):

    # 配置文件定义的显示频率的Key
    scope = "test_scope"

    def get_cache_key(self, request, view):
        """
        Should return a unique cache-key which can be used for throttling.
        Must be overridden.

        May return `None` if the request should not be throttled.
        """
        if not request.user:
            ident = self.get_ident(request)
        else:
            ident = request.user

        return self.cache_format % {
            'scope': self.scope,
            'ident': ident
        }


class TestView(APIView):
    throttle_classes = [TestThrottle, ]

    def get(self, request, *args, **kwargs):
        # self.dispatch
        print(request.user)
        print(request.auth)
        return Response('GET请求,响应内容')

    def post(self, request, *args, **kwargs):
        return Response('POST请求,响应内容')

    def put(self, request, *args, **kwargs):
        return Response('PUT请求,响应内容')

    def throttled(self, request, wait):
        """
        访问次数被限制时,定制错误信息
        """

        class Throttled(exceptions.Throttled):
            default_detail = '请求被限制.'
            extra_detail_singular = '请 {wait} 秒之后再重试.'
            extra_detail_plural = '请 {wait} 秒之后再重试.'

        raise Throttled(wait)

c. view中限制请求频率

代码语言:javascript
复制
REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_RATES': {
        'xxxxxx': '10/m',
    },
}
代码语言:javascript
复制
from django.conf.urls import url, include
from web.views import TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]
代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response

from rest_framework import exceptions
from rest_framework.throttling import ScopedRateThrottle


# 继承 ScopedRateThrottle
class TestThrottle(ScopedRateThrottle):

    def get_cache_key(self, request, view):
        """
        Should return a unique cache-key which can be used for throttling.
        Must be overridden.

        May return `None` if the request should not be throttled.
        """
        if not request.user:
            ident = self.get_ident(request)
        else:
            ident = request.user

        return self.cache_format % {
            'scope': self.scope,
            'ident': ident
        }


class TestView(APIView):
    throttle_classes = [TestThrottle, ]

    # 在settings中获取 xxxxxx 对应的频率限制值
    throttle_scope = "xxxxxx"

    def get(self, request, *args, **kwargs):
        # self.dispatch
        print(request.user)
        print(request.auth)
        return Response('GET请求,响应内容')

    def post(self, request, *args, **kwargs):
        return Response('POST请求,响应内容')

    def put(self, request, *args, **kwargs):
        return Response('PUT请求,响应内容')

    def throttled(self, request, wait):
        """
        访问次数被限制时,定制错误信息
        """

        class Throttled(exceptions.Throttled):
            default_detail = '请求被限制.'
            extra_detail_singular = '请 {wait} 秒之后再重试.'
            extra_detail_plural = '请 {wait} 秒之后再重试.'

        raise Throttled(wait)

d. 匿名时用IP限制+登录时用Token限制

代码语言:javascript
复制
REST_FRAMEWORK = {
    'UNAUTHENTICATED_USER': None,
    'UNAUTHENTICATED_TOKEN': None,
    'DEFAULT_THROTTLE_RATES': {
        'luffy_anon': '10/m',
        'luffy_user': '20/m',
    },
}
代码语言:javascript
复制
from django.conf.urls import url, include
from web.views.s3_throttling import TestView

urlpatterns = [
    url(r'^test/', TestView.as_view()),
]
代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response

from rest_framework.throttling import SimpleRateThrottle


class LuffyAnonRateThrottle(SimpleRateThrottle):
    """
    匿名用户,根据IP进行限制
    """
    scope = "luffy_anon"

    def get_cache_key(self, request, view):
        # 用户已登录,则跳过 匿名频率限制
        if request.user:
            return None

        return self.cache_format % {
            'scope': self.scope,
            'ident': self.get_ident(request)
        }


class LuffyUserRateThrottle(SimpleRateThrottle):
    """
    登录用户,根据用户token限制
    """
    scope = "luffy_user"

    def get_ident(self, request):
        """
        认证成功时:request.user是用户对象;request.auth是token对象
        :param request: 
        :return: 
        """
        # return request.auth.token
        return "user_token"

    def get_cache_key(self, request, view):
        """
        获取缓存key
        :param request: 
        :param view: 
        :return: 
        """
        # 未登录用户,则跳过 Token限制
        if not request.user:
            return None

        return self.cache_format % {
            'scope': self.scope,
            'ident': self.get_ident(request)
        }


class TestView(APIView):
    throttle_classes = [LuffyUserRateThrottle, LuffyAnonRateThrottle, ]

    def get(self, request, *args, **kwargs):
        # self.dispatch
        print(request.user)
        print(request.auth)
        return Response('GET请求,响应内容')

    def post(self, request, *args, **kwargs):
        return Response('POST请求,响应内容')

    def put(self, request, *args, **kwargs):
        return Response('PUT请求,响应内容')

e. 全局使用

代码语言:javascript
复制
REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_CLASSES': [
        'api.utils.throttles.throttles.LuffyAnonRateThrottle',
        'api.utils.throttles.throttles.LuffyUserRateThrottle',
    ],
    'DEFAULT_THROTTLE_RATES': {
        'anon': '10/day',
        'user': '10/day',
        'luffy_anon': '10/m',
        'luffy_user': '20/m',
    },
}

下面来看看最简单的从源码中分析的示例,这只是举例说明了一下

代码语言:javascript
复制
1 from django.conf.urls import url
2 from app04 import views
3 urlpatterns = [
4     url('limit/',views.LimitView.as_view()),
5 
6 ]
代码语言:javascript
复制
 1 from django.shortcuts import render
 2 from rest_framework.views import APIView
 3 from rest_framework.response import Response
 4 from rest_framework import exceptions
 5 # from rest_framewor import
 6 # Create your views here.
 7 class MyThrottle(object):
 8     def allow_request(self,request,view):
 9         #返回False,限制
10         #返回True,不限制
11         pass
12     def wait(self):
13         return 1000
14 
15 
16 class LimitView(APIView):
17     authentication_classes = []  #不让认证用户
18     permission_classes = []  #不让验证权限
19     throttle_classes = [MyThrottle, ]
20     def get(self,request):
21         # self.dispatch
22         return Response('控制访问频率示例')
23 
24     def throttled(self, request, wait):
25         '''可定制方法设置中文错误'''
26         # raise exceptions.Throttled(wait)
27         class MyThrottle(exceptions.Throttled):
28             default_detail = '请求被限制'
29             extra_detail_singular = 'Expected available in {wait} second.'
30             extra_detail_plural = 'Expected available in {wait} seconds.'
31             default_code = '还需要再等{wait}秒'
32         raise MyThrottle(wait)

需求:对匿名用户进行限制,每个用户一分钟允许访问10次(只针对用户来说)

a、基于用户IP限制访问频率

流程分析:

  • 先获取用户信息,如果是匿名用户,获取IP。如果不是匿名用户就可以获取用户名。
  • 获取匿名用户IP,在request里面获取,比如IP= 1.1.1.1。
  • 吧获取到的IP添加到到recode字典里面,需要在添加之前先限制一下。
  • 如果时间间隔大于60秒,说明时间久远了,就把那个时间给剔除 了pop。在timelist列表里面现在留的是有效的访问时间段。
  • 然后判断他的访问次数超过了10次没有,如果超过了时间就return False。
  • 美中不足的是时间是固定的,我们改变他为动态的:列表里面最开始进来的时间和当前的时间进行比较,看需要等多久。

具体实现:

代码语言:javascript
复制
 1 from django.shortcuts import render
 2 from rest_framework.views import APIView
 3 from rest_framework.response import Response
 4 from rest_framework import exceptions
 5 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle  #限制访问频率
 6 import time
 7 # Create your views here.
 8 RECORD = {}
 9 class MyThrottle(BaseThrottle):
10 
11     def allow_request(self,request,view):
12         '''对匿名用户进行限制,每个用户一分钟访问10次 '''
13         ctime = time.time()
14         ip = '1.1.1.1'
15         if ip not in RECORD:
16             RECORD[ip] = [ctime]
17         else:
18             #[152042123,15204212,3152042,123152042123]
19             time_list = RECORD[ip]  #获取ip里面的值
20             while True:
21                 val = time_list[-1]#取出最后一个时间,也就是访问最早的时间
22                 if (ctime-60)>val:  #吧时间大于60秒的给剔除了
23                     time_list.pop()
24                 #剔除了之后timelist里面就是有效的时间了,在进行判断他的访问次数是不是超过10次
25                 else:
26                     break
27             if len(time_list) >10:
28                 return False        # 返回False,限制
29             time_list.insert(0, ctime)
30         return True   #返回True,不限制
31 
32     def wait(self):
33         ctime = time.time()
34         first_in_time = RECORD['1.1.1.1'][-1]
35         wt = 60-(ctime-first_in_time)
36         return wt
37 
38 
39 class LimitView(APIView):
40     authentication_classes = []  #不让认证用户
41     permission_classes = []  #不让验证权限
42     throttle_classes = [MyThrottle, ]
43     def get(self,request):
44         # self.dispatch
45         return Response('控制访问频率示例')
46 
47     def throttled(self, request, wait):
48         '''可定制方法设置中文错误'''
49         # raise exceptions.Throttled(wait)
50         class MyThrottle(exceptions.Throttled):
51             default_detail = '请求被限制'
52             extra_detail_singular = 'Expected available in {wait} second.'
53             extra_detail_plural = 'Expected available in {wait} seconds.'
54             default_code = '还需要再等{wait}秒'
55         raise MyThrottle(wait)
代码语言:javascript
复制
  1 # from django.shortcuts import render
  2 # from rest_framework.views import APIView
  3 # from rest_framework.response import Response
  4 # from rest_framework import exceptions
  5 # from rest_framework.throttling import BaseThrottle,SimpleRateThrottle  #限制访问频率
  6 # import time
  7 # # Create your views here.
  8 # RECORD = {}
  9 # class MyThrottle(BaseThrottle):
 10 #
 11 #     def allow_request(self,request,view):
 12 #         '''对匿名用户进行限制,每个用户一分钟访问10次 '''
 13 #         ctime = time.time()
 14 #         ip = '1.1.1.1'
 15 #         if ip not in RECORD:
 16 #             RECORD[ip] = [ctime]
 17 #         else:
 18 #             #[152042123,15204212,3152042,123152042123]
 19 #             time_list = RECORD[ip]  #获取ip里面的值
 20 #             while True:
 21 #                 val = time_list[-1]#取出最后一个时间,也就是访问最早的时间
 22 #                 if (ctime-60)>val:  #吧时间大于60秒的给剔除了
 23 #                     time_list.pop()
 24 #                 #剔除了之后timelist里面就是有效的时间了,在进行判断他的访问次数是不是超过10次
 25 #                 else:
 26 #                     break
 27 #             if len(time_list) >10:
 28 #                 return False        # 返回False,限制
 29 #             time_list.insert(0, ctime)
 30 #         return True   #返回True,不限制
 31 #
 32 #     def wait(self):
 33 #         ctime = time.time()
 34 #         first_in_time = RECORD['1.1.1.1'][-1]
 35 #         wt = 60-(ctime-first_in_time)
 36 #         return wt
 37 #
 38 #
 39 # class LimitView(APIView):
 40 #     authentication_classes = []  #不让认证用户
 41 #     permission_classes = []  #不让验证权限
 42 #     throttle_classes = [MyThrottle, ]
 43 #     def get(self,request):
 44 #         # self.dispatch
 45 #         return Response('控制访问频率示例')
 46 #
 47 #     def throttled(self, request, wait):
 48 #         '''可定制方法设置中文错误'''
 49 #         # raise exceptions.Throttled(wait)
 50 #         class MyThrottle(exceptions.Throttled):
 51 #             default_detail = '请求被限制'
 52 #             extra_detail_singular = 'Expected available in {wait} second.'
 53 #             extra_detail_plural = 'Expected available in {wait} seconds.'
 54 #             default_code = '还需要再等{wait}秒'
 55 #         raise MyThrottle(wait)
 56 
 57 
 58 
 59 from django.shortcuts import render
 60 from rest_framework.views import APIView
 61 from rest_framework.response import Response
 62 from rest_framework import exceptions
 63 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle  #限制访问频率
 64 import time
 65 # Create your views here.
 66 RECORD = {}
 67 class MyThrottle(BaseThrottle):
 68 
 69     def allow_request(self,request,view):
 70         '''对匿名用户进行限制,每个用户一分钟访问10次 '''
 71         ctime = time.time()
 72         self.ip =self.get_ident(request)
 73         if self.ip not in RECORD:
 74             RECORD[self.ip] = [ctime]
 75         else:
 76             #[152042123,15204212,3152042,123152042123]
 77             time_list = RECORD[self.ip]  #获取ip里面的值
 78             while True:
 79                 val = time_list[-1]#取出最后一个时间,也就是访问最早的时间
 80                 if (ctime-60)>val:  #吧时间大于60秒的给剔除了
 81                     time_list.pop()
 82                 #剔除了之后timelist里面就是有效的时间了,在进行判断他的访问次数是不是超过10次
 83                 else:
 84                     break
 85             if len(time_list) >10:
 86                 return False        # 返回False,限制
 87             time_list.insert(0, ctime)
 88         return True   #返回True,不限制
 89 
 90     def wait(self):
 91         ctime = time.time()
 92         first_in_time = RECORD[self.ip][-1]
 93         wt = 60-(ctime-first_in_time)
 94         return wt
 95 
 96 
 97 class LimitView(APIView):
 98     authentication_classes = []  #不让认证用户
 99     permission_classes = []  #不让验证权限
100     throttle_classes = [MyThrottle, ]
101     def get(self,request):
102         # self.dispatch
103         return Response('控制访问频率示例')
104 
105     def throttled(self, request, wait):
106         '''可定制方法设置中文错误'''
107         # raise exceptions.Throttled(wait)
108         class MyThrottle(exceptions.Throttled):
109             default_detail = '请求被限制'
110             extra_detail_singular = 'Expected available in {wait} second.'
111             extra_detail_plural = 'Expected available in {wait} seconds.'
112             default_code = '还需要再等{wait}秒'
113         raise MyThrottle(wait)

b、用resetframework内部的限制访问频率(利于Django缓存)

源码分析:

代码语言:javascript
复制
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle  #限制访问频率
代码语言:javascript
复制
 1 class BaseThrottle(object):
 2     """
 3     Rate throttling of requests.
 4     """
 5 
 6     def allow_request(self, request, view):
 7         """
 8         Return `True` if the request should be allowed, `False` otherwise.
 9         """
10         raise NotImplementedError('.allow_request() must be overridden')
11 
12     def get_ident(self, request):  #唯一标识
13         """
14         Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
15         if present and number of proxies is > 0. If not use all of
16         HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
17         """
18         xff = request.META.get('HTTP_X_FORWARDED_FOR')
19         remote_addr = request.META.get('REMOTE_ADDR') #获取IP等
20         num_proxies = api_settings.NUM_PROXIES
21 
22         if num_proxies is not None:
23             if num_proxies == 0 or xff is None:
24                 return remote_addr
25             addrs = xff.split(',')
26             client_addr = addrs[-min(num_proxies, len(addrs))]
27             return client_addr.strip()
28 
29         return ''.join(xff.split()) if xff else remote_addr
30 
31     def wait(self):
32         """
33         Optionally, return a recommended number of seconds to wait before
34         the next request.
35         """
36         return None
代码语言:javascript
复制
  1 class SimpleRateThrottle(BaseThrottle):
  2     """
  3     一个简单的缓存实现,只需要` get_cache_key() `。被覆盖。
  4     速率(请求/秒)是由视图上的“速率”属性设置的。类。该属性是一个字符串的形式number_of_requests /期。
  5     周期应该是:(的),“秒”,“M”,“min”,“h”,“小时”,“D”,“一天”。
  6     以前用于节流的请求信息存储在高速缓存中。
  7     A simple cache implementation, that only requires `.get_cache_key()`
  8     to be overridden.
  9 
 10     The rate (requests / seconds) is set by a `rate` attribute on the View
 11     class.  The attribute is a string of the form 'number_of_requests/period'.
 12 
 13     Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')
 14 
 15     Previous request information used for throttling is stored in the cache.
 16     """
 17     cache = default_cache
 18     timer = time.time
 19     cache_format = 'throttle_%(scope)s_%(ident)s'
 20     scope = None
 21     THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
 22 
 23     def __init__(self):
 24         if not getattr(self, 'rate', None):
 25             self.rate = self.get_rate()
 26         self.num_requests, self.duration = self.parse_rate(self.rate)
 27 
 28     def get_cache_key(self, request, view):#这个相当于是一个半成品,我们可以来补充它
 29         """
 30         Should return a unique cache-key which can be used for throttling.
 31         Must be overridden.
 32 
 33         May return `None` if the request should not be throttled.
 34         """
 35         raise NotImplementedError('.get_cache_key() must be overridden')
 36 
 37     def get_rate(self):
 38         """
 39         Determine the string representation of the allowed request rate.
 40         """
 41         if not getattr(self, 'scope', None):
 42             msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
 43                    self.__class__.__name__)
 44             raise ImproperlyConfigured(msg)
 45 
 46         try:
 47             return self.THROTTLE_RATES[self.scope]
 48         except KeyError:
 49             msg = "No default throttle rate set for '%s' scope" % self.scope
 50             raise ImproperlyConfigured(msg)
 51 
 52     def parse_rate(self, rate):
 53         """
 54         Given the request rate string, return a two tuple of:
 55         <allowed number of requests>, <period of time in seconds>
 56         """
 57         if rate is None:
 58             return (None, None)
 59         num, period = rate.split('/')
 60         num_requests = int(num)
 61         duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
 62         return (num_requests, duration)
 63 
 64     #1、一进来会先执行他,
 65     def allow_request(self, request, view):
 66         """
 67         Implement the check to see if the request should be throttled.
 68 
 69         On success calls `throttle_success`.
 70         On failure calls `throttle_failure`.
 71         """
 72         if self.rate is None:
 73             return True
 74 
 75         self.key = self.get_cache_key(request, view)  #2、执行get_cache_key,这里的self.key就相当于我们举例ip
 76         if self.key is None:
 77             return True
 78 
 79         self.history = self.cache.get(self.key, [])  #3、得到的key,默认是一个列表,赋值给了self.history,
 80                                                         # 这时候self.history就是每一个ip对应的访问记录
 81         self.now = self.timer()
 82 
 83         # Drop any requests from the history which have now passed the
 84         # throttle duration
 85         while self.history and self.history[-1] <= self.now - self.duration:
 86             self.history.pop()
 87         if len(self.history) >= self.num_requests:
 88             return self.throttle_failure()
 89         return self.throttle_success()
 90 
 91     def throttle_success(self):
 92         """
 93         Inserts the current request's timestamp along with the key
 94         into the cache.
 95         """
 96         self.history.insert(0, self.now)
 97         self.cache.set(self.key, self.history, self.duration)
 98         return True
 99 
100     def throttle_failure(self):
101         """
102         Called when a request to the API has failed due to throttling.
103         """
104         return False
105 
106     def wait(self):
107         """
108         Returns the recommended next request time in seconds.
109         """
110         if self.history:
111             remaining_duration = self.duration - (self.now - self.history[-1])
112         else:
113             remaining_duration = self.duration
114 
115         available_requests = self.num_requests - len(self.history) + 1
116         if available_requests <= 0:
117             return None
118 
119         return remaining_duration / float(available_requests)

请求一进来会先执行SimpleRateThrottle这个类的构造方法

代码语言:javascript
复制
1  def __init__(self):
2         if not getattr(self, 'rate', None):
3             self.rate = self.get_rate()  #点进去看到需要些一个scope  ,2/m
4         self.num_requests, self.duration = self.parse_rate(self.rate)
代码语言:javascript
复制
 1     def get_rate(self):
 2         """
 3         Determine the string representation of the allowed request rate.
 4         """
 5         if not getattr(self, 'scope', None):  #检测必须有scope,没有就报错了
 6             msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
 7                    self.__class__.__name__)
 8             raise ImproperlyConfigured(msg)
 9 
10         try:
11             return self.THROTTLE_RATES[self.scope]
12         except KeyError:
13             msg = "No default throttle rate set for '%s' scope" % self.scope
14             raise ImproperlyConfigured(msg)
代码语言:javascript
复制
 1   def parse_rate(self, rate):
 2         """
 3         Given the request rate string, return a two tuple of:
 4         <allowed number of requests>, <period of time in seconds>
 5         """
 6         if rate is None:
 7             return (None, None)
 8         num, period = rate.split('/')
 9         num_requests = int(num)
10         duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
11         return (num_requests, duration)
代码语言:javascript
复制
 1   #2、接下来会先执行他,
 2     def allow_request(self, request, view):
 3         """
 4         Implement the check to see if the request should be throttled.
 5 
 6         On success calls `throttle_success`.
 7         On failure calls `throttle_failure`.
 8         """
 9         if self.rate is None:
10             return True
11 
12         self.key = self.get_cache_key(request, view)  #2、执行get_cache_key,这里的self.key就相当于我们举例ip
13         if self.key is None:
14             return True  #不限制
15         # [114521212,11452121211,45212121145,21212114,521212]
16         self.history = self.cache.get(self.key, [])  #3、得到的key,默认是一个列表,赋值给了self.history,
17                                                         # 这时候self.history就是每一个ip对应的访问记录
18         self.now = self.timer()
19 
20         # Drop any requests from the history which have now passed the
21         # throttle duration
22         while self.history and self.history[-1] <= self.now - self.duration:
23             self.history.pop()
24         if len(self.history) >= self.num_requests:
25             return self.throttle_failure()
26         return self.throttle_success()
代码语言:javascript
复制
 1     def wait(self):
 2         """
 3         Returns the recommended next request time in seconds.
 4         """
 5         if self.history:
 6             remaining_duration = self.duration - (self.now - self.history[-1])
 7         else:
 8             remaining_duration = self.duration
 9 
10         available_requests = self.num_requests - len(self.history) + 1
11         if available_requests <= 0:
12             return None
13 
14         return remaining_duration / float(available_requests)

代码实现:

代码语言:javascript
复制
 1 ###########用resetframework内部的限制访问频率##############
 2 class MySimpleRateThrottle(SimpleRateThrottle):
 3     scope = 'xxx'
 4     def get_cache_key(self, request, view):
 5         return self.get_ident(request)  #返回唯一标识IP
 6 
 7 class LimitView(APIView):
 8     authentication_classes = []  #不让认证用户
 9     permission_classes = []  #不让验证权限
10     throttle_classes = [MySimpleRateThrottle, ]
11     def get(self,request):
12         # self.dispatch
13         return Response('控制访问频率示例')
14 
15     def throttled(self, request, wait):
16         '''可定制方法设置中文错误'''
17         # raise exceptions.Throttled(wait)
18         class MyThrottle(exceptions.Throttled):
19             default_detail = '请求被限制'
20             extra_detail_singular = 'Expected available in {wait} second.'
21             extra_detail_plural = 'Expected available in {wait} seconds.'
22             default_code = '还需要再等{wait}秒'
23         raise MyThrottle(wait)

记得在settings里面配置

代码语言:javascript
复制
 1 REST_FRAMEWORK = {
 2     'UNAUTHENTICATED_USER': None,
 3     'UNAUTHENTICATED_TOKEN': None,  #将匿名用户设置为None
 4     "DEFAULT_AUTHENTICATION_CLASSES": [
 5         "app01.utils.MyAuthentication",
 6     ],
 7     'DEFAULT_PERMISSION_CLASSES':[
 8         # "app03.utils.MyPermission",#设置路径,
 9     ],
10     'DEFAULT_THROTTLE_RATES':{
11         'xxx':'2/minute'  #2分钟
12     }
13 }
14 
15 #缓存:放在文件
16 CACHES = {
17     'default': {
18         'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache',
19         'LOCATION': 'cache',   #文件路径
20     }
21 }

对匿名用户进行限制,每个用户1分钟允许访问5次,对于登录的普通用户1分钟访问10次,VIP用户一分钟访问20次

  • 比如首页可以匿名访问
  • #先认证,只有认证了才知道是不是匿名的,
  • #权限登录成功之后才能访问, ,index页面就不需要权限了
  • If request.user #判断登录了没有
代码语言:javascript
复制
1 from django.contrib import admin
2 
3 from django.conf.urls import url, include
4 from app05 import views
5 
6 urlpatterns = [
7     url('index/',views.IndexView.as_view()),
8     url('manage/',views.ManageView.as_view()),
9 ]
代码语言:javascript
复制
 1 from django.shortcuts import render
 2 from rest_framework.views import APIView
 3 from rest_framework.response import Response
 4 from rest_framework.authentication import BaseAuthentication  #认证需要
 5 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限流处理
 6 from rest_framework.permissions import BasePermission
 7 from rest_framework import exceptions
 8 from app01 import models
 9 # Create your views here.
10 ###############3##认证#####################
11 class MyAuthentcate(BaseAuthentication):
12     '''检查用户是否存在,如果存在就返回user和auth,如果没有就返回'''
13     def authenticate(self, request):
14         token = request.query_params.get('token')
15         obj = models.UserInfo.objects.filter(token=token).first()
16         if obj:
17             return (obj.username,obj.token)
18         return None  #表示我不处理
19 
20 ##################权限#####################
21 class MyPermission(BasePermission):
22     message='无权访问'
23     def has_permission(self, request, view):
24         if request.user:
25             return True  #true表示有权限
26         return False  #false表示无权限
27 
28 class AdminPermission(BasePermission):
29     message = '无权访问'
30 
31     def has_permission(self, request, view):
32         if request.user=='haiyan':
33             return True  # true表示有权限
34         return False  # false表示无权限
35 
36 ############3#####限流##################3##
37 class AnonThrottle(SimpleRateThrottle):
38     scope = 'wdp_anon'  #相当于设置了最大的访问次数和时间
39     def get_cache_key(self, request, view):
40         if request.user:
41             return None  #返回None表示我不限制,登录用户我不管
42         #匿名用户
43         return self.get_ident(request)  #返回一个唯一标识IP
44 
45 class UserThrottle(SimpleRateThrottle):
46     scope = 'wdp_user'
47     def get_cache_key(self, request, view):
48         #登录用户
49         if request.user:
50             return request.user
51         return None  #返回NOne表示匿名用户我不管
52 
53 
54 ##################视图#####################
55 #首页支持匿名访问,
56 #无需要登录就可以访问
57 class IndexView(APIView):
58     authentication_classes = [MyAuthentcate,]   #认证判断他是不是匿名用户
59     permission_classes = []   #一般主页就不需要权限验证了
60     throttle_classes = [AnonThrottle,UserThrottle,]  #对匿名用户和普通用户的访问限制
61 
62     def get(self,request):
63         # self.dispatch
64         return Response('访问首页')
65 
66     def throttled(self, request, wait):
67         '''可定制方法设置中文错误'''
68 
69         # raise exceptions.Throttled(wait)
70         class MyThrottle(exceptions.Throttled):
71             default_detail = '请求被限制'
72             extra_detail_singular = 'Expected available in {wait} second.'
73             extra_detail_plural = 'Expected available in {wait} seconds.'
74             default_code = '还需要再等{wait}秒'
75 
76         raise MyThrottle(wait)
77 
78 #需登录就可以访问
79 class ManageView(APIView):
80     authentication_classes = [MyAuthentcate, ]  # 认证判断他是不是匿名用户
81     permission_classes = [MyPermission,]  # 一般主页就不需要权限验证了
82     throttle_classes = [AnonThrottle, UserThrottle, ]  # 对匿名用户和普通用户的访问限制
83 
84     def get(self, request):
85         # self.dispatch
86         return Response('管理人员访问页面')
87 
88     def throttled(self, request, wait):
89         '''可定制方法设置中文错误'''
90 
91         # raise exceptions.Throttled(wait)
92         class MyThrottle(exceptions.Throttled):
93             default_detail = '请求被限制'
94             extra_detail_singular = 'Expected available in {wait} second.'
95             extra_detail_plural = 'Expected available in {wait} seconds.'
96             default_code = '还需要再等{wait}秒'
97 
98         raise MyThrottle(wait)

四、总结

1、认证:就是检查用户是否存在;如果存在返回(request.user,request.auth);不存在request.user/request.auth=None

2、权限:进行职责的划分

3、限制访问频率

代码语言:javascript
复制
认证
    - 类:authenticate/authenticate_header ##验证不成功的时候执行的
    - 返回值:
        - return None,
        - return (user,auth),
        - raise 异常
    - 配置:
        - 视图:
            class IndexView(APIView):
                authentication_classes = [MyAuthentication,]
        - 全局:
            REST_FRAMEWORK = {
                    'UNAUTHENTICATED_USER': None,
                    'UNAUTHENTICATED_TOKEN': None,
                    "DEFAULT_AUTHENTICATION_CLASSES": [
                        # "app02.utils.MyAuthentication",
                    ],
            }

权限 
    - 类:has_permission/has_object_permission
    - 返回值: 
        - True、#有权限
        - False、#无权限
        - exceptions.PermissionDenied(detail="错误信息")  #异常自己随意,想抛就抛,错误信息自己指定
    - 配置:
        - 视图:
            class IndexView(APIView):
                permission_classes = [MyPermission,]
        - 全局:
            REST_FRAMEWORK = {
                    "DEFAULT_PERMISSION_CLASSES": [
                        # "app02.utils.MyAuthentication",
                    ],
            }
限流
    - 类:allow_request/wait PS: scope = "wdp_user"
    - 返回值:
      return True、#不限制
      return False  #限制
    - 配置: 
            - 视图: 
                class IndexView(APIView):
                    
                    throttle_classes=[AnonThrottle,UserThrottle,]
                    def get(self,request,*args,**kwargs):
                        self.dispatch
                        return Response('访问首页')
            - 全局
                REST_FRAMEWORK = {
                    "DEFAULT_THROTTLE_CLASSES":[
                    
                    ],
                    'DEFAULT_THROTTLE_RATES':{
                        'wdp_anon':'5/minute',
                        'wdp_user':'10/minute',
                    }
                }
    
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-02-16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、认证和授权
  • 二、权限
  • 三、用户访问次数/频率限制
    • 1、为什么要限流呢
      • 2、限制访问频率源码分析
        • 3.方法
        • 四、总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档