81 lines
3.1 KiB
Python
81 lines
3.1 KiB
Python
|
|
from django.db import models
|
|||
|
|
from django.contrib.auth import get_user_model
|
|||
|
|
|
|||
|
|
|
|||
|
|
User = get_user_model()
|
|||
|
|
|
|||
|
|
|
|||
|
|
class UserProfile(models.Model):
|
|||
|
|
user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='profile')
|
|||
|
|
display_name = models.CharField(max_length=100, blank=True, default='')
|
|||
|
|
contact_phone = models.CharField(max_length=30, blank=True, default='')
|
|||
|
|
default_free_traffic_gb_per_domain_override = models.PositiveIntegerField(null=True, blank=True)
|
|||
|
|
|
|||
|
|
class Meta:
|
|||
|
|
verbose_name = '用户资料'
|
|||
|
|
verbose_name_plural = '用户资料'
|
|||
|
|
|
|||
|
|
def __str__(self):
|
|||
|
|
return self.display_name or self.user.get_username()
|
|||
|
|
|
|||
|
|
# Create your models here.
|
|||
|
|
|
|||
|
|
|
|||
|
|
class LoginRecord(models.Model):
|
|||
|
|
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='login_records')
|
|||
|
|
ip_address = models.GenericIPAddressField(null=True, blank=True)
|
|||
|
|
user_agent = models.CharField(max_length=255, blank=True, default='')
|
|||
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|||
|
|
|
|||
|
|
class Meta:
|
|||
|
|
verbose_name = '登录历史'
|
|||
|
|
verbose_name_plural = '登录历史'
|
|||
|
|
ordering = ['-created_at']
|
|||
|
|
|
|||
|
|
def __str__(self):
|
|||
|
|
return f"{self.user.get_username()} @ {self.created_at}"
|
|||
|
|
|
|||
|
|
|
|||
|
|
class LoginThrottle(models.Model):
|
|||
|
|
user = models.ForeignKey(User, null=True, blank=True, on_delete=models.SET_NULL, related_name='login_throttles')
|
|||
|
|
ip_address = models.GenericIPAddressField(null=True, blank=True)
|
|||
|
|
fail_count = models.PositiveIntegerField(default=0)
|
|||
|
|
total_fail_count = models.PositiveIntegerField(default=0)
|
|||
|
|
last_failed_at = models.DateTimeField(null=True, blank=True)
|
|||
|
|
banned_until = models.DateTimeField(null=True, blank=True)
|
|||
|
|
note = models.CharField(max_length=255, blank=True, default='')
|
|||
|
|
|
|||
|
|
class Meta:
|
|||
|
|
verbose_name = '登录节流'
|
|||
|
|
verbose_name_plural = '登录节流'
|
|||
|
|
ordering = ['-last_failed_at']
|
|||
|
|
indexes = [
|
|||
|
|
models.Index(fields=['ip_address']),
|
|||
|
|
models.Index(fields=['user']),
|
|||
|
|
models.Index(fields=['banned_until']),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
def is_banned(self):
|
|||
|
|
from django.utils import timezone
|
|||
|
|
return bool(self.banned_until and self.banned_until > timezone.now())
|
|||
|
|
|
|||
|
|
def register_failure(self, ban_threshold=5, ban_minutes=15):
|
|||
|
|
from django.utils import timezone
|
|||
|
|
from datetime import timedelta
|
|||
|
|
now = timezone.now()
|
|||
|
|
self.fail_count += 1
|
|||
|
|
self.total_fail_count += 1
|
|||
|
|
self.last_failed_at = now
|
|||
|
|
if self.fail_count >= ban_threshold:
|
|||
|
|
self.banned_until = now + timedelta(minutes=int(ban_minutes))
|
|||
|
|
self.note = f'自动封禁 {int(ban_minutes)} 分钟(超过失败阈值 {int(ban_threshold)})'
|
|||
|
|
self.fail_count = 0
|
|||
|
|
self.save(update_fields=['fail_count', 'total_fail_count', 'last_failed_at', 'banned_until', 'note'])
|
|||
|
|
|
|||
|
|
def register_success(self):
|
|||
|
|
if self.fail_count or self.banned_until:
|
|||
|
|
self.fail_count = 0
|
|||
|
|
self.banned_until = None
|
|||
|
|
self.note = ''
|
|||
|
|
self.save(update_fields=['fail_count', 'banned_until', 'note'])
|