Files
pyGoEdge-UserPanel/accounts/views.py

227 lines
10 KiB
Python
Raw Normal View History

2025-11-18 03:36:49 +08:00
from django.shortcuts import render, redirect
import logging
from django.contrib.auth import authenticate, login as auth_login, logout as auth_logout
from django.contrib.auth.decorators import login_required
from django.contrib import messages
from django.db import transaction
from django.db.models import Q
from django.contrib.auth import get_user_model
from django.utils import timezone
from django.views.decorators.http import require_POST
from django.core.exceptions import ValidationError
import re
from core.models import SystemSettings
from .models import UserProfile, LoginRecord
from .forms import LoginForm, RegisterForm, ProfileForm, SimplePasswordChangeForm
User = get_user_model()
def _get_settings():
settings_obj = SystemSettings.objects.order_by('id').first()
if not settings_obj:
settings_obj = SystemSettings.objects.create()
return settings_obj
def _gen_captcha(request):
import random
a, b = random.randint(1, 9), random.randint(1, 9)
request.session['captcha_answer'] = str(a + b)
return f"{a} + {b} = ?"
def login(request):
settings_obj = _get_settings()
enable_captcha = getattr(settings_obj, 'captcha_enabled', False)
question = None
if request.method == 'POST':
form = LoginForm(request.POST, enable_captcha=enable_captcha)
if form.is_valid():
login_key = form.cleaned_data['login'].strip()
password = form.cleaned_data['password']
if enable_captcha:
cap = form.cleaned_data.get('captcha', '').strip()
if cap != request.session.get('captcha_answer'):
messages.error(request, '验证码错误')
question = _gen_captcha(request)
return render(request, 'accounts/login.html', {'form': form, 'captcha_question': question})
user = None
# 支持用户名或邮箱登录
try:
user = User.objects.filter(Q(username__iexact=login_key) | Q(email__iexact=login_key)).first()
except Exception:
user = None
ip = request.META.get('HTTP_X_FORWARDED_FOR')
if ip:
ip = ip.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')
from .models import LoginThrottle
BAN_THRESHOLD = 5
BAN_MINUTES = 15
ip_throttle, _ = LoginThrottle.objects.get_or_create(user=None, ip_address=ip)
if ip_throttle.is_banned():
messages.error(request, '登录受限:该 IP 暂时被封禁,请稍后再试')
question = _gen_captcha(request) if enable_captcha else None
return render(request, 'accounts/login.html', {'form': form, 'captcha_question': question})
user_throttle = None
if user:
user_throttle, _ = LoginThrottle.objects.get_or_create(user=user, ip_address=None)
if user_throttle.is_banned():
messages.error(request, '登录受限:该账号暂时被封禁,请稍后再试')
question = _gen_captcha(request) if enable_captcha else None
return render(request, 'accounts/login.html', {'form': form, 'captcha_question': question})
auth_username = user.username if user else login_key
user_auth = authenticate(request, username=auth_username, password=password)
if user_auth is not None:
auth_login(request, user_auth)
# 记录登录历史
try:
ua = request.META.get('HTTP_USER_AGENT', '')[:255]
LoginRecord.objects.create(user=user_auth, ip_address=ip, user_agent=ua)
except Exception:
logger.exception('login record write failed')
try:
ip_throttle.register_success()
if user_throttle:
user_throttle.register_success()
except Exception:
logger.exception('login throttle cleanup failed')
messages.success(request, '登录成功')
next_url = request.GET.get('next') or '/domains/'
return redirect(next_url)
else:
messages.error(request, '登录失败:账号或密码错误')
try:
ip_throttle.register_failure(ban_threshold=BAN_THRESHOLD, ban_minutes=BAN_MINUTES)
if user:
user_throttle.register_failure(ban_threshold=BAN_THRESHOLD, ban_minutes=BAN_MINUTES)
except Exception:
logger.exception('login throttle write failed')
question = _gen_captcha(request) if enable_captcha else None
else:
form = LoginForm(enable_captcha=enable_captcha)
question = _gen_captcha(request) if enable_captcha else None
return render(request, 'accounts/login.html', {'form': form, 'captcha_question': question})
def register(request):
settings_obj = _get_settings()
enable_captcha = getattr(settings_obj, 'captcha_enabled', False)
question = None
if request.method == 'POST':
form = RegisterForm(request.POST, enable_captcha=enable_captcha)
if form.is_valid():
if enable_captcha:
cap = form.cleaned_data.get('captcha', '').strip()
if cap != request.session.get('captcha_answer'):
messages.error(request, '验证码错误')
question = _gen_captcha(request)
return render(request, 'accounts/register.html', {'form': form, 'captcha_question': question})
try:
_validate_password_strength(form.cleaned_data['password1'])
except ValidationError as e:
messages.error(request, f'密码不符合安全要求:{e.messages[0]}')
question = _gen_captcha(request) if enable_captcha else None
return render(request, 'accounts/register.html', {'form': form, 'captcha_question': question})
with transaction.atomic():
user = User.objects.create_user(
username=form.cleaned_data['username'],
email=form.cleaned_data['email'],
password=form.cleaned_data['password1'],
)
UserProfile.objects.create(
user=user,
display_name=form.cleaned_data.get('display_name', ''),
contact_phone=form.cleaned_data.get('contact_phone', ''),
)
messages.success(request, '注册成功,请登录')
return redirect('accounts:login')
else:
form = RegisterForm(enable_captcha=enable_captcha)
question = _gen_captcha(request) if enable_captcha else None
return render(request, 'accounts/register.html', {'form': form, 'captcha_question': question})
@login_required
@require_POST
def logout(request):
auth_logout(request)
messages.success(request, '您已退出登录')
return redirect('/accounts/login/')
@login_required
def profile(request):
profile = getattr(request.user, 'profile', None)
if not profile:
profile = UserProfile.objects.create(user=request.user)
if request.method == 'POST':
form = ProfileForm(request.POST)
if form.is_valid():
profile.display_name = form.cleaned_data.get('display_name', '')
profile.contact_phone = form.cleaned_data.get('contact_phone', '')
profile.save(update_fields=['display_name', 'contact_phone'])
messages.success(request, '资料已更新')
return redirect('accounts:profile')
else:
messages.error(request, '保存失败,请检查表单')
else:
form = ProfileForm(initial={
'display_name': profile.display_name,
'contact_phone': profile.contact_phone,
})
# 最近登录记录
records = LoginRecord.objects.filter(user=request.user).order_by('-created_at')[:20]
return render(request, 'accounts/profile.html', {'form': form, 'records': records})
@login_required
def password_change(request):
if request.method == 'POST':
form = SimplePasswordChangeForm(user=request.user, data=request.POST)
new_password = request.POST.get('new_password1')
try:
_validate_password_strength(new_password)
except ValidationError as e:
messages.error(request, f'密码不符合安全要求:{e.messages[0]}')
return render(request, 'accounts/password_change.html', {'form': form})
if form.is_valid():
form.save()
messages.success(request, '密码已修改,请重新登录')
return redirect('accounts:logout')
else:
messages.error(request, '修改失败,请检查表单')
else:
form = SimplePasswordChangeForm(user=request.user)
return render(request, 'accounts/password_change.html', {'form': form})
@login_required
def login_history(request):
records = LoginRecord.objects.filter(user=request.user).order_by('-created_at')[:50]
return render(request, 'accounts/login_history.html', {'records': records})
logger = logging.getLogger(__name__)
def _validate_password_strength(password: str):
common_weak = {
'password', '123456', '123456789', 'qwerty', 'abc123', '111111', '123123',
'password1', 'admin', 'letmein'
}
if not password or len(password) < 12:
raise ValidationError('密码过短,至少需要 12 位')
if password.lower() in common_weak:
raise ValidationError('密码过于常见,存在安全风险')
if re.fullmatch(r'(.)\1{7,}', password):
raise ValidationError('密码字符重复过多,存在安全风险')
if not re.search(r'[a-z]', password):
raise ValidationError('需至少包含一个小写字母')
if not re.search(r'[A-Z]', password):
raise ValidationError('需至少包含一个大写字母')
if not re.search(r'\d', password):
raise ValidationError('需至少包含一个数字')
if not re.search(r'[^\w\s]', password):
raise ValidationError('需至少包含一个特殊字符(如 !@#¥% 等)')