from django.shortcuts import render, redirect import logging from django.contrib.auth import authenticate, login as auth_login, logout as auth_logout from django.contrib.auth.decorators import login_required from django.contrib import messages from django.db import transaction from django.db.models import Q from django.contrib.auth import get_user_model from django.utils import timezone from django.views.decorators.http import require_POST from django.core.exceptions import ValidationError import re from core.models import SystemSettings from .models import UserProfile, LoginRecord from .forms import LoginForm, RegisterForm, ProfileForm, SimplePasswordChangeForm User = get_user_model() def _get_settings(): settings_obj = SystemSettings.objects.order_by('id').first() if not settings_obj: settings_obj = SystemSettings.objects.create() return settings_obj def _gen_captcha(request): import random a, b = random.randint(1, 9), random.randint(1, 9) request.session['captcha_answer'] = str(a + b) return f"{a} + {b} = ?" def login(request): settings_obj = _get_settings() enable_captcha = getattr(settings_obj, 'captcha_enabled', False) question = None if request.method == 'POST': form = LoginForm(request.POST, enable_captcha=enable_captcha) if form.is_valid(): login_key = form.cleaned_data['login'].strip() password = form.cleaned_data['password'] if enable_captcha: cap = form.cleaned_data.get('captcha', '').strip() if cap != request.session.get('captcha_answer'): messages.error(request, '验证码错误') question = _gen_captcha(request) return render(request, 'accounts/login.html', {'form': form, 'captcha_question': question}) user = None # 支持用户名或邮箱登录 try: user = User.objects.filter(Q(username__iexact=login_key) | Q(email__iexact=login_key)).first() except Exception: user = None ip = request.META.get('HTTP_X_FORWARDED_FOR') if ip: ip = ip.split(',')[0].strip() else: ip = request.META.get('REMOTE_ADDR') from .models import LoginThrottle BAN_THRESHOLD = 5 BAN_MINUTES = 15 ip_throttle, _ = LoginThrottle.objects.get_or_create(user=None, ip_address=ip) if ip_throttle.is_banned(): messages.error(request, '登录受限:该 IP 暂时被封禁,请稍后再试') question = _gen_captcha(request) if enable_captcha else None return render(request, 'accounts/login.html', {'form': form, 'captcha_question': question}) user_throttle = None if user: user_throttle, _ = LoginThrottle.objects.get_or_create(user=user, ip_address=None) if user_throttle.is_banned(): messages.error(request, '登录受限:该账号暂时被封禁,请稍后再试') question = _gen_captcha(request) if enable_captcha else None return render(request, 'accounts/login.html', {'form': form, 'captcha_question': question}) auth_username = user.username if user else login_key user_auth = authenticate(request, username=auth_username, password=password) if user_auth is not None: auth_login(request, user_auth) # 记录登录历史 try: ua = request.META.get('HTTP_USER_AGENT', '')[:255] LoginRecord.objects.create(user=user_auth, ip_address=ip, user_agent=ua) except Exception: logger.exception('login record write failed') try: ip_throttle.register_success() if user_throttle: user_throttle.register_success() except Exception: logger.exception('login throttle cleanup failed') messages.success(request, '登录成功') next_url = request.GET.get('next') or '/domains/' return redirect(next_url) else: messages.error(request, '登录失败:账号或密码错误') try: ip_throttle.register_failure(ban_threshold=BAN_THRESHOLD, ban_minutes=BAN_MINUTES) if user: user_throttle.register_failure(ban_threshold=BAN_THRESHOLD, ban_minutes=BAN_MINUTES) except Exception: logger.exception('login throttle write failed') question = _gen_captcha(request) if enable_captcha else None else: form = LoginForm(enable_captcha=enable_captcha) question = _gen_captcha(request) if enable_captcha else None return render(request, 'accounts/login.html', {'form': form, 'captcha_question': question}) def register(request): settings_obj = _get_settings() enable_captcha = getattr(settings_obj, 'captcha_enabled', False) question = None if request.method == 'POST': form = RegisterForm(request.POST, enable_captcha=enable_captcha) if form.is_valid(): if enable_captcha: cap = form.cleaned_data.get('captcha', '').strip() if cap != request.session.get('captcha_answer'): messages.error(request, '验证码错误') question = _gen_captcha(request) return render(request, 'accounts/register.html', {'form': form, 'captcha_question': question}) try: _validate_password_strength(form.cleaned_data['password1']) except ValidationError as e: messages.error(request, f'密码不符合安全要求:{e.messages[0]}') question = _gen_captcha(request) if enable_captcha else None return render(request, 'accounts/register.html', {'form': form, 'captcha_question': question}) with transaction.atomic(): user = User.objects.create_user( username=form.cleaned_data['username'], email=form.cleaned_data['email'], password=form.cleaned_data['password1'], ) UserProfile.objects.create( user=user, display_name=form.cleaned_data.get('display_name', ''), contact_phone=form.cleaned_data.get('contact_phone', ''), ) messages.success(request, '注册成功,请登录') return redirect('accounts:login') else: form = RegisterForm(enable_captcha=enable_captcha) question = _gen_captcha(request) if enable_captcha else None return render(request, 'accounts/register.html', {'form': form, 'captcha_question': question}) @login_required @require_POST def logout(request): auth_logout(request) messages.success(request, '您已退出登录') return redirect('/accounts/login/') @login_required def profile(request): profile = getattr(request.user, 'profile', None) if not profile: profile = UserProfile.objects.create(user=request.user) if request.method == 'POST': form = ProfileForm(request.POST) if form.is_valid(): profile.display_name = form.cleaned_data.get('display_name', '') profile.contact_phone = form.cleaned_data.get('contact_phone', '') profile.save(update_fields=['display_name', 'contact_phone']) messages.success(request, '资料已更新') return redirect('accounts:profile') else: messages.error(request, '保存失败,请检查表单') else: form = ProfileForm(initial={ 'display_name': profile.display_name, 'contact_phone': profile.contact_phone, }) # 最近登录记录 records = LoginRecord.objects.filter(user=request.user).order_by('-created_at')[:20] return render(request, 'accounts/profile.html', {'form': form, 'records': records}) @login_required def password_change(request): if request.method == 'POST': form = SimplePasswordChangeForm(user=request.user, data=request.POST) new_password = request.POST.get('new_password1') try: _validate_password_strength(new_password) except ValidationError as e: messages.error(request, f'密码不符合安全要求:{e.messages[0]}') return render(request, 'accounts/password_change.html', {'form': form}) if form.is_valid(): form.save() messages.success(request, '密码已修改,请重新登录') return redirect('accounts:logout') else: messages.error(request, '修改失败,请检查表单') else: form = SimplePasswordChangeForm(user=request.user) return render(request, 'accounts/password_change.html', {'form': form}) @login_required def login_history(request): records = LoginRecord.objects.filter(user=request.user).order_by('-created_at')[:50] return render(request, 'accounts/login_history.html', {'records': records}) logger = logging.getLogger(__name__) def _validate_password_strength(password: str): common_weak = { 'password', '123456', '123456789', 'qwerty', 'abc123', '111111', '123123', 'password1', 'admin', 'letmein' } if not password or len(password) < 12: raise ValidationError('密码过短,至少需要 12 位') if password.lower() in common_weak: raise ValidationError('密码过于常见,存在安全风险') if re.fullmatch(r'(.)\1{7,}', password): raise ValidationError('密码字符重复过多,存在安全风险') if not re.search(r'[a-z]', password): raise ValidationError('需至少包含一个小写字母') if not re.search(r'[A-Z]', password): raise ValidationError('需至少包含一个大写字母') if not re.search(r'\d', password): raise ValidationError('需至少包含一个数字') if not re.search(r'[^\w\s]', password): raise ValidationError('需至少包含一个特殊字符(如 !@#¥% 等)')