Finish packaging and change to src-based packaging layout, replace caddy with haproxy for performance, and update docker-compose and Dockerfiles for new packaging.

This commit is contained in:
badblocks 2025-05-09 18:39:04 -07:00
parent 959b06c425
commit 762361a21b
210 changed files with 235 additions and 168 deletions

View file

View file

@ -0,0 +1,7 @@
from django.contrib import admin
from .models import TradeOffer, TradeOfferHaveCard, TradeOfferWantCard, TradeAcceptance
admin.site.register(TradeOffer)
admin.site.register(TradeOfferHaveCard)
admin.site.register(TradeOfferWantCard)
admin.site.register(TradeAcceptance)

View file

@ -0,0 +1,9 @@
from django.apps import AppConfig
class TradesConfig(AppConfig):
name = "pkmntrade_club.trades"
def ready(self):
# Implicitly connect signal handlers decorated with @receiver.
import pkmntrade_club.trades.signals

View file

@ -0,0 +1,183 @@
from django import forms
from django.core.exceptions import ValidationError
from .models import TradeOffer, TradeAcceptance
from pkmntrade_club.accounts.models import FriendCode
from pkmntrade_club.cards.models import Card
from django.forms import ModelForm
from pkmntrade_club.trades.models import TradeOfferHaveCard, TradeOfferWantCard
class NoValidationMultipleChoiceField(forms.MultipleChoiceField):
def validate(self, value):
# Override the validation to skip checking against defined choices
pass
class TradeOfferAcceptForm(forms.Form):
friend_code = forms.ModelChoiceField(
queryset=FriendCode.objects.none(),
label="Select a Friend Code to Accept This Trade Offer"
)
def __init__(self, *args, **kwargs):
# Expecting a keyword argument `friend_codes` with the user's friend codes.
friend_codes = kwargs.pop("friend_codes")
super().__init__(*args, **kwargs)
self.fields["friend_code"].queryset = friend_codes
class TradeAcceptanceCreateForm(forms.ModelForm):
"""
Form for creating a TradeAcceptance.
Expects the caller to pass:
- trade_offer: the instance of TradeOffer this acceptance is for.
- friend_codes: a queryset of FriendCode objects for the current user.
- default_friend_code (optional): the user's default FriendCode.
It filters available requested and offered cards based on what's still available.
"""
class Meta:
model = TradeAcceptance
fields = ["accepted_by", "requested_card", "offered_card"]
def __init__(self, *args, trade_offer=None, friend_codes=None, default_friend_code=None, **kwargs):
if trade_offer is None:
raise ValueError("trade_offer must be provided to filter choices.")
super().__init__(*args, **kwargs)
self.trade_offer = trade_offer
if friend_codes is None:
raise ValueError("friend_codes must be provided")
# Set the accepted_by queryset to the user's friend codes.
self.fields["accepted_by"].queryset = friend_codes
# If the user only has one friend code, preset the field and use a HiddenInput.
if friend_codes.count() == 1:
self.initial["accepted_by"] = friend_codes.first().pk
self.fields["accepted_by"].widget = forms.HiddenInput()
# Otherwise, if a default friend code is provided and it is in the queryset, preselect it.
elif default_friend_code and friend_codes.filter(pk=default_friend_code.pk).exists():
self.initial["accepted_by"] = default_friend_code.pk
# Fix: Convert available 'have' cards (from through model) to Card objects.
self.fields["requested_card"].queryset = Card.objects.filter(
pk__in=trade_offer.have_cards_available_qs.values_list("card_id", flat=True)
)
# Similarly for offered_card.
self.fields["offered_card"].queryset = Card.objects.filter(
pk__in=trade_offer.want_cards_available_qs.values_list("card_id", flat=True)
)
def clean(self):
"""
Ensure the instance has its trade_offer set before model-level validation occurs.
This prevents errors when the model clean() method attempts to access trade_offer.
"""
self.instance.trade_offer = self.trade_offer
return super().clean()
class ButtonRadioSelect(forms.RadioSelect):
template_name = "widgets/button_radio_select.html"
class TradeAcceptanceTransitionForm(forms.Form):
state = forms.ChoiceField(widget=forms.HiddenInput())
def __init__(self, *args, instance=None, user=None, **kwargs):
"""
Initializes the form with allowed transitions from the provided instance.
:param instance: A TradeAcceptance instance.
"""
super().__init__(*args, **kwargs)
if instance is None:
raise ValueError("A TradeAcceptance instance must be provided")
self.instance = instance
self.user = user
self.fields["state"].choices = instance.get_allowed_state_transitions(user)
class TradeOfferCreateForm(ModelForm):
# Override the default fields to capture quantity info in the format 'card_id:quantity'
have_cards = NoValidationMultipleChoiceField(widget=forms.SelectMultiple, required=True)
want_cards = NoValidationMultipleChoiceField(widget=forms.SelectMultiple, required=True)
class Meta:
model = TradeOffer
fields = ["want_cards", "have_cards", "initiated_by"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Populate choices from Card model using the new field 'rarity_level' instead of the removed relation.
cards = Card.objects.order_by("name", "rarity_level")
choices = [(str(card.pk), card.name) for card in cards]
self.fields["have_cards"].choices = choices
self.fields["want_cards"].choices = choices
def clean_have_cards(self):
data = self.data.getlist("have_cards")
parsed = {}
for item in data:
if ':' not in item:
# Ignore any input without a colon.
continue
parts = item.split(':')
card_id = parts[0]
try:
# Only parse quantity when a colon is present.
quantity = int(parts[1])
except ValueError:
raise forms.ValidationError(f"Invalid quantity provided in {item}")
parsed[card_id] = parsed.get(card_id, 0) + quantity
# Validate that each have card does not exceed the max quantity of 20.
for card_id, quantity in parsed.items():
if quantity > 20:
card = Card.objects.get(pk=card_id)
raise forms.ValidationError(
f"Maximum allowed quantity for each have card is 20. Card {card} has {quantity}."
)
# Ensure no more than 20 unique have cards are selected.
if len(parsed) > 20:
raise forms.ValidationError("You can only select a maximum of 20 unique have cards.")
return parsed
def clean_want_cards(self):
data = self.data.getlist("want_cards")
parsed = {}
for item in data:
if ':' not in item:
continue
parts = item.split(':')
card_id = parts[0]
try:
quantity = int(parts[1])
except ValueError:
raise forms.ValidationError(f"Invalid quantity provided in {item}")
parsed[card_id] = parsed.get(card_id, 0) + quantity
# Validate that each want card does not exceed the max quantity of 20.
for card_id, quantity in parsed.items():
if quantity > 20:
# look up card name
card = Card.objects.get(pk=card_id)
raise forms.ValidationError(
f"Maximum allowed quantity for each want card is 20. Card {card} has {quantity}."
)
# Ensure no more than 20 unique want cards are selected.
if len(parsed) > 20:
raise forms.ValidationError("You can only select a maximum of 20 unique want cards.")
return parsed
def save(self, commit=True):
instance = super().save(commit=False)
if commit:
instance.save()
# Clear any existing through model entries in case of update
TradeOfferHaveCard.objects.filter(trade_offer=instance).delete()
TradeOfferWantCard.objects.filter(trade_offer=instance).delete()
# Create through entries for have_cards
for card_id, quantity in self.cleaned_data["have_cards"].items():
card = Card.objects.get(pk=card_id)
TradeOfferHaveCard.objects.create(trade_offer=instance, card=card, quantity=quantity)
# Create through entries for want_cards
for card_id, quantity in self.cleaned_data["want_cards"].items():
card = Card.objects.get(pk=card_id)
TradeOfferWantCard.objects.create(trade_offer=instance, card=card, quantity=quantity)
return instance

View file

@ -0,0 +1,83 @@
# Generated by Django 5.1 on 2025-05-10 01:22
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('accounts', '0001_initial'),
('cards', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='TradeOffer',
fields=[
('id', models.AutoField(primary_key=True, serialize=False)),
('is_closed', models.BooleanField(db_index=True, default=False)),
('hash', models.CharField(editable=False, max_length=9)),
('rarity_icon', models.CharField(max_length=8, null=True)),
('rarity_level', models.IntegerField(null=True)),
('image', models.ImageField(blank=True, null=True, upload_to='trade_offers/')),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('initiated_by', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='initiated_trade_offers', to='accounts.friendcode')),
],
),
migrations.CreateModel(
name='TradeAcceptance',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('state', models.CharField(choices=[('ACCEPTED', 'Accepted'), ('SENT', 'Sent'), ('RECEIVED', 'Received'), ('THANKED_BY_INITIATOR', 'Thanked by Initiator'), ('THANKED_BY_ACCEPTOR', 'Thanked by Acceptor'), ('THANKED_BY_BOTH', 'Thanked by Both'), ('REJECTED_BY_INITIATOR', 'Rejected by Initiator'), ('REJECTED_BY_ACCEPTOR', 'Rejected by Acceptor')], default='ACCEPTED', max_length=25)),
('hash', models.CharField(blank=True, editable=False, max_length=9)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('accepted_by', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='trade_acceptances', to='accounts.friendcode')),
('offered_card', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='accepted_offered', to='cards.card')),
('requested_card', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='accepted_requested', to='cards.card')),
('trade_offer', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='acceptances', to='trades.tradeoffer')),
],
),
migrations.CreateModel(
name='TradeOfferHaveCard',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('quantity', models.PositiveIntegerField(default=1)),
('qty_accepted', models.PositiveIntegerField(default=0, editable=False)),
('card', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='cards.card')),
('trade_offer', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='trade_offer_have_cards', to='trades.tradeoffer')),
],
options={
'ordering': ['card__name'],
'unique_together': {('trade_offer', 'card')},
},
),
migrations.AddField(
model_name='tradeoffer',
name='have_cards',
field=models.ManyToManyField(related_name='trade_offers_have', through='trades.TradeOfferHaveCard', to='cards.card'),
),
migrations.CreateModel(
name='TradeOfferWantCard',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('quantity', models.PositiveIntegerField(default=1)),
('qty_accepted', models.PositiveIntegerField(default=0, editable=False)),
('card', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='cards.card')),
('trade_offer', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='trade_offer_want_cards', to='trades.tradeoffer')),
],
options={
'ordering': ['card__name'],
'unique_together': {('trade_offer', 'card')},
},
),
migrations.AddField(
model_name='tradeoffer',
name='want_cards',
field=models.ManyToManyField(related_name='trade_offers_want', through='trades.TradeOfferWantCard', to='cards.card'),
),
]

View file

@ -0,0 +1 @@
0001_initial

View file

@ -0,0 +1,37 @@
from pkmntrade_club.cards.models import Card
from django.core.exceptions import PermissionDenied
class TradeOfferContextMixin:
def get_context_data(self, **kwargs):
# Start with any context passed in.
context = kwargs.copy()
# Include available cards requirements for multiselect fields.
context.setdefault("cards", Card.objects.all().order_by("name", "rarity_level"))
# Provide friend_codes and selected_friend_code as in TradeOfferCreateView
friend_codes = self.request.user.friend_codes.all()
context["friend_codes"] = friend_codes
if "initiated_by" in self.request.GET:
try:
selected_friend_code = friend_codes.get(pk=self.request.GET.get("initiated_by"))
except friend_codes.model.DoesNotExist:
selected_friend_code = self.request.user.default_friend_code or friend_codes.first()
else:
selected_friend_code = self.request.user.default_friend_code or friend_codes.first()
context["selected_friend_code"] = selected_friend_code
return context
class FriendCodeRequiredMixin:
"""
Mixin to ensure the authenticated user has at least one friend code.
This mixin must be placed after LoginRequiredMixin in the view's inheritance order.
"""
def dispatch(self, request, *args, **kwargs):
# Since LoginRequiredMixin guarantees that request.user is authenticated,
# we assume request.user has the attribute `friend_codes`. If no friend code exists,
# raise a PermissionDenied error.
if not getattr(request.user, 'friend_codes', None) or not request.user.friend_codes.exists():
raise PermissionDenied("No friend codes available for your account.")
return super().dispatch(request, *args, **kwargs)

View file

@ -0,0 +1,468 @@
from django.db import models
from django.core.exceptions import ValidationError
from django.db.models import Q, Count, Prefetch, F, Sum, Max
import hashlib
from pkmntrade_club.cards.models import Card
from pkmntrade_club.accounts.models import FriendCode
from datetime import timedelta
from django.utils import timezone
import uuid
def generate_tradeoffer_hash():
"""
Generates a unique 9-character hash for a TradeOffer.
The last character 'z' indicates its type.
"""
return hashlib.md5(uuid.uuid4().hex.encode("utf-8")).hexdigest()[:8] + "z"
def generate_tradeacceptance_hash():
"""
Generates a unique 9-character hash for a TradeAcceptance.
The last character 'y' indicates its type.
"""
return hashlib.md5(uuid.uuid4().hex.encode("utf-8")).hexdigest()[:8] + "y"
class TradeOfferManager(models.Manager):
def get_queryset(self):
qs = super().get_queryset().select_related(
"initiated_by",
"initiated_by__user",
).prefetch_related(
"trade_offer_have_cards__card",
"trade_offer_want_cards__card",
"acceptances",
"acceptances__accepted_by",
"acceptances__requested_card",
"acceptances__offered_card",
"acceptances__accepted_by__user",
)
return qs.order_by("-updated_at")
class TradeOffer(models.Model):
objects = TradeOfferManager()
id = models.AutoField(primary_key=True)
is_closed = models.BooleanField(default=False, db_index=True)
hash = models.CharField(max_length=9, editable=False)
initiated_by = models.ForeignKey(
"accounts.FriendCode",
on_delete=models.PROTECT,
related_name='initiated_trade_offers'
)
rarity_icon = models.CharField(max_length=8, null=True)
rarity_level = models.IntegerField(null=True)
image = models.ImageField(upload_to='trade_offers/', null=True, blank=True)
want_cards = models.ManyToManyField(
"cards.Card",
related_name='trade_offers_want',
through="TradeOfferWantCard"
)
have_cards = models.ManyToManyField(
"cards.Card",
related_name='trade_offers_have',
through="TradeOfferHaveCard"
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def __str__(self):
want_names = ", ".join([x.name for x in self.want_cards.all()])
have_names = ", ".join([x.name for x in self.have_cards.all()])
return f"Want: {want_names} -> Have: {have_names}"
def save(self, *args, **kwargs):
if not self.hash:
self.hash = generate_tradeoffer_hash()
super().save(*args, **kwargs)
def update_rarity_fields(self):
"""
Recalculates and updates the rarity_level and rarity_icon fields based on
the associated have_cards and want_cards.
Enforces that all cards in the trade offer share the same rarity.
Uses the first card's rarity details to update both fields.
"""
# Gather all cards from both sides.
cards = list(self.have_cards.all()) + list(self.want_cards.all())
if not cards:
return
# Enforce same rarity across all cards.
rarity_levels = {card.rarity_level for card in cards}
if len(rarity_levels) > 1:
raise ValidationError("All cards in a trade offer must have the same rarity.")
first_card = cards[0]
if first_card.rarity_level > 5:
raise ValidationError("Cannot trade cards above one-star rarity.")
if self.rarity_level != first_card.rarity_level or self.rarity_icon != first_card.rarity_icon:
self.rarity_level = first_card.rarity_level
self.rarity_icon = first_card.rarity_icon
# Use super().save() here to avoid recursion.
super(TradeOffer, self).save(update_fields=["rarity_level", "rarity_icon"])
@property
def have_cards_available(self):
# Returns the list of have_cards (through objects) that still have available quantity.
return [item for item in self.trade_offer_have_cards.all() if item.quantity > item.qty_accepted]
@property
def want_cards_available(self):
# Returns the list of want_cards (through objects) that still have available quantity.
return [item for item in self.trade_offer_want_cards.all() if item.quantity > item.qty_accepted]
@property
def have_cards_available_qs(self):
# Returns a queryset of TradeOfferHaveCard objects that still have available quantity
return self.trade_offer_have_cards.filter(quantity__gt=F("qty_accepted")).select_related("card")
@property
def want_cards_available_qs(self):
# Returns a queryset of TradeOfferWantCard objects that still have available quantity
return self.trade_offer_want_cards.filter(quantity__gt=F("qty_accepted")).select_related("card")
class TradeOfferHaveCard(models.Model):
"""
Through model for TradeOffer.have_cards.
Represents the card the initiator is offering along with the quantity available.
"""
trade_offer = models.ForeignKey(
TradeOffer,
on_delete=models.CASCADE,
related_name='trade_offer_have_cards',
db_index=True
)
card = models.ForeignKey("cards.Card", on_delete=models.PROTECT, db_index=True)
quantity = models.PositiveIntegerField(default=1)
qty_accepted = models.PositiveIntegerField(default=0, editable=False)
@property
def qty_available(self):
return self.quantity - self.qty_accepted
def __str__(self):
return f"#{self.card.cardnum} {self.card.cardset} {self.card.rarity_icon} {self.card.name}"
def save(self, *args, **kwargs):
self.trade_offer.update_rarity_fields()
super().save(*args, **kwargs)
def delete(self, *args, **kwargs):
trade_offer = self.trade_offer
super().delete(*args, **kwargs)
trade_offer.update_rarity_fields()
class Meta:
unique_together = ("trade_offer", "card")
ordering = ['card__name']
class TradeOfferWantCard(models.Model):
"""
Through model for TradeOffer.want_cards.
Represents the card the initiator is requesting along with the quantity requested.
"""
trade_offer = models.ForeignKey(
TradeOffer,
on_delete=models.CASCADE,
related_name='trade_offer_want_cards'
)
card = models.ForeignKey("cards.Card", on_delete=models.PROTECT)
quantity = models.PositiveIntegerField(default=1)
qty_accepted = models.PositiveIntegerField(default=0, editable=False)
@property
def qty_available(self):
return self.quantity - self.qty_accepted
def __str__(self):
return f"#{self.card.cardnum} {self.card.cardset} {self.card.rarity_icon} {self.card.name}"
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
self.trade_offer.update_rarity_fields()
def delete(self, *args, **kwargs):
trade_offer = self.trade_offer
super().delete(*args, **kwargs)
trade_offer.update_rarity_fields()
class Meta:
unique_together = ("trade_offer", "card")
ordering = ['card__name']
class TradeAcceptance(models.Model):
class AcceptanceState(models.TextChoices):
ACCEPTED = 'ACCEPTED', 'Accepted'
SENT = 'SENT', 'Sent'
RECEIVED = 'RECEIVED', 'Received'
THANKED_BY_INITIATOR = 'THANKED_BY_INITIATOR', 'Thanked by Initiator'
THANKED_BY_ACCEPTOR = 'THANKED_BY_ACCEPTOR', 'Thanked by Acceptor'
THANKED_BY_BOTH = 'THANKED_BY_BOTH', 'Thanked by Both'
REJECTED_BY_INITIATOR = 'REJECTED_BY_INITIATOR', 'Rejected by Initiator'
REJECTED_BY_ACCEPTOR = 'REJECTED_BY_ACCEPTOR', 'Rejected by Acceptor'
# DRY improvement: define active states once as a class-level constant.
POSITIVE_STATES = [
AcceptanceState.ACCEPTED,
AcceptanceState.SENT,
AcceptanceState.RECEIVED,
AcceptanceState.THANKED_BY_INITIATOR,
AcceptanceState.THANKED_BY_ACCEPTOR,
AcceptanceState.THANKED_BY_BOTH,
]
trade_offer = models.ForeignKey(
TradeOffer,
on_delete=models.CASCADE,
related_name='acceptances',
db_index=True
)
accepted_by = models.ForeignKey(
"accounts.FriendCode",
on_delete=models.PROTECT,
related_name='trade_acceptances'
)
requested_card = models.ForeignKey(
"cards.Card",
on_delete=models.PROTECT,
related_name='accepted_requested'
)
offered_card = models.ForeignKey(
"cards.Card",
on_delete=models.PROTECT,
related_name='accepted_offered'
)
state = models.CharField(
max_length=25,
choices=AcceptanceState.choices,
default=AcceptanceState.ACCEPTED
)
hash = models.CharField(max_length=9, editable=False, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
# A mapping for alternate action labels
ALTERNATE_ACTION_LABELS = {
AcceptanceState.REJECTED_BY_INITIATOR: "Reject This Trade",
AcceptanceState.REJECTED_BY_ACCEPTOR: "Reject This Trade",
# Optionally add alternate labels for other states:
AcceptanceState.ACCEPTED: "Accept This Trade Offer",
AcceptanceState.SENT: "Mark Sent",
AcceptanceState.RECEIVED: "Mark Received",
AcceptanceState.THANKED_BY_INITIATOR: "Send Thanks",
AcceptanceState.THANKED_BY_ACCEPTOR: "Send Thanks",
AcceptanceState.THANKED_BY_BOTH: "Send Thanks",
}
ALTERNATE_ACTION_LABELS_2 = {
AcceptanceState.REJECTED_BY_INITIATOR: "Rejected this Trade",
AcceptanceState.REJECTED_BY_ACCEPTOR: "Rejected this Trade",
AcceptanceState.ACCEPTED: "Accepted this Trade",
AcceptanceState.SENT: "Sent the Card",
AcceptanceState.RECEIVED: "Received the Card and Responded",
AcceptanceState.THANKED_BY_INITIATOR: "Sent Thanks",
AcceptanceState.THANKED_BY_ACCEPTOR: "Sent Thanks",
AcceptanceState.THANKED_BY_BOTH: "Sent Thanks",
}
@classmethod
def get_action_label_for_state(cls, state_value):
"""
Returns the alternate action label for the provided state_value.
If no alternate label exists, falls back to the default label.
"""
default = dict(cls.AcceptanceState.choices).get(state_value, state_value)
return cls.ALTERNATE_ACTION_LABELS.get(state_value, default)
@property
def action_label(self):
"""
For the current acceptance state, return the alternate action label.
"""
return self.get_action_label_for_state(self.state)
@property
def next_action_label(self):
"""
Returns what the next action label would be based on the current state.
"""
if self.state == self.AcceptanceState.ACCEPTED:
return self.get_action_label_for_state(self.AcceptanceState.SENT)
elif self.state == self.AcceptanceState.SENT:
return self.get_action_label_for_state(self.AcceptanceState.RECEIVED)
elif self.state == self.AcceptanceState.RECEIVED or self.state == self.AcceptanceState.THANKED_BY_ACCEPTOR or self.state == self.AcceptanceState.THANKED_BY_INITIATOR:
return self.get_action_label_for_state(self.AcceptanceState.THANKED_BY_BOTH)
else:
return None
@classmethod
def get_action_label_for_state_2(cls, state_value):
"""
Returns the alternate action label for the provided state_value.
If no alternate label exists, falls back to the default label.
"""
default = dict(cls.AcceptanceState.choices).get(state_value, state_value)
return cls.ALTERNATE_ACTION_LABELS_2.get(state_value, default)
@property
def action_label_2(self):
"""
For the current acceptance state, return the alternate action label.
"""
return self.get_action_label_for_state_2(self.state)
@property
def is_initiator_state(self):
return self.state in [self.AcceptanceState.SENT.value, self.AcceptanceState.THANKED_BY_INITIATOR.value, self.AcceptanceState.THANKED_BY_BOTH.value]
@property
def is_acceptor_state(self):
return self.state in [self.AcceptanceState.ACCEPTED.value, self.AcceptanceState.RECEIVED.value, self.AcceptanceState.THANKED_BY_ACCEPTOR.value, self.AcceptanceState.THANKED_BY_BOTH.value]
@property
def is_completed(self):
return self.state in {
self.AcceptanceState.RECEIVED,
self.AcceptanceState.THANKED_BY_INITIATOR,
self.AcceptanceState.THANKED_BY_ACCEPTOR,
self.AcceptanceState.THANKED_BY_BOTH,
}
@property
def is_thanked(self):
return self.state == self.AcceptanceState.THANKED_BY_BOTH
@property
def is_rejected(self):
return self.state in {
self.AcceptanceState.REJECTED_BY_INITIATOR,
self.AcceptanceState.REJECTED_BY_ACCEPTOR,
}
@property
def is_completed_or_rejected(self):
return self.is_completed or self.is_rejected
@property
def is_active(self):
return not self.is_completed_or_rejected
def clean(self):
from django.core.exceptions import ValidationError
try:
have_card = self.trade_offer.trade_offer_have_cards.get(card_id=self.requested_card_id)
except TradeOfferHaveCard.DoesNotExist:
raise ValidationError("The requested card must be one of the trade offer's available cards (have_cards).")
try:
want_card = self.trade_offer.trade_offer_want_cards.get(card_id=self.offered_card_id)
except TradeOfferWantCard.DoesNotExist:
raise ValidationError("The offered card must be one of the trade offer's requested cards (want_cards).")
# Only perform these validations on creation (when self.pk is None).
if self.pk is None:
if self.trade_offer.is_closed:
raise ValidationError("This trade offer is closed. No more acceptances are allowed.")
# Use direct comparison with qty_accepted and quantity.
if have_card.qty_accepted >= have_card.quantity:
raise ValidationError("The requested card has no available quantity.")
if want_card.qty_accepted >= want_card.quantity:
raise ValidationError("The offered card has no available quantity.")
def get_step_number(self):
if self.state in [
self.AcceptanceState.THANKED_BY_INITIATOR,
self.AcceptanceState.THANKED_BY_ACCEPTOR,
]:
return 4
elif self.state in [
self.AcceptanceState.THANKED_BY_BOTH,
]:
return 5
elif self.state in [
self.AcceptanceState.REJECTED_BY_INITIATOR,
self.AcceptanceState.REJECTED_BY_ACCEPTOR,
]:
return 0
else:
return next(index for index, choice in enumerate(self.AcceptanceState.choices) if choice[0] == self.state) + 1
def update_state(self, new_state, user):
if new_state not in [choice[0] for choice in self.AcceptanceState.choices]:
raise ValueError(f"'{new_state}' is not a valid state.")
if (new_state == self.AcceptanceState.THANKED_BY_ACCEPTOR and self.state == self.AcceptanceState.THANKED_BY_INITIATOR) or \
(new_state == self.AcceptanceState.THANKED_BY_INITIATOR and self.state == self.AcceptanceState.THANKED_BY_ACCEPTOR):
new_state = self.AcceptanceState.THANKED_BY_BOTH
if self.state in [
self.AcceptanceState.THANKED_BY_BOTH,
self.AcceptanceState.REJECTED_BY_INITIATOR,
self.AcceptanceState.REJECTED_BY_ACCEPTOR
]:
raise ValueError(f"No transitions allowed from the terminal state '{self.state}'.")
allowed = [x for x, y in self.get_allowed_state_transitions(user)]
if new_state not in allowed:
raise ValueError(f"Transition from {self.state} to {new_state} is not allowed.")
self._actioning_user = user
self.state = new_state
self.save(update_fields=["state"])
def save(self, *args, **kwargs):
if not self.hash:
self.hash = generate_tradeacceptance_hash()
super().save(*args, **kwargs)
def __str__(self):
return (f"TradeAcceptance(offer_hash={self.trade_offer.hash}, "
f"accepted_by={self.accepted_by}, "
f"requested_card={self.requested_card}, "
f"offered_card={self.offered_card}, state={self.state})")
def get_allowed_state_transitions(self, user):
if self.trade_offer.initiated_by in user.friend_codes.all():
allowed_transitions = {
self.AcceptanceState.ACCEPTED: {
self.AcceptanceState.SENT,
self.AcceptanceState.REJECTED_BY_INITIATOR,
},
self.AcceptanceState.SENT: {
self.AcceptanceState.REJECTED_BY_INITIATOR,
},
self.AcceptanceState.RECEIVED: {
self.AcceptanceState.THANKED_BY_INITIATOR,
self.AcceptanceState.REJECTED_BY_INITIATOR,
},
self.AcceptanceState.THANKED_BY_INITIATOR: { },
self.AcceptanceState.THANKED_BY_ACCEPTOR: {
self.AcceptanceState.REJECTED_BY_INITIATOR,
self.AcceptanceState.THANKED_BY_BOTH,
},
}
elif self.accepted_by in user.friend_codes.all():
allowed_transitions = {
self.AcceptanceState.ACCEPTED: {
self.AcceptanceState.REJECTED_BY_ACCEPTOR,
},
self.AcceptanceState.SENT: {
self.AcceptanceState.RECEIVED,
self.AcceptanceState.REJECTED_BY_ACCEPTOR,
},
self.AcceptanceState.RECEIVED: {
self.AcceptanceState.THANKED_BY_ACCEPTOR, #allow early thanks (uses THANKED_BY_ACCEPTOR state)
self.AcceptanceState.REJECTED_BY_ACCEPTOR
},
self.AcceptanceState.THANKED_BY_ACCEPTOR: { },
self.AcceptanceState.THANKED_BY_INITIATOR: {
self.AcceptanceState.THANKED_BY_BOTH,
},
}
else:
allowed_transitions = {}
allowed = allowed_transitions.get(self.state, {})
return [(state, self.AcceptanceState(state).label) for state in allowed]

View file

@ -0,0 +1,282 @@
from django.db.models.signals import post_save, post_delete, pre_save
from django.dispatch import receiver
from django.db.models import F
from pkmntrade_club.trades.models import TradeOfferHaveCard, TradeOfferWantCard, TradeAcceptance, TradeOffer
from django.db import transaction
from pkmntrade_club.accounts.models import CustomUser
from datetime import timedelta
from django.utils import timezone
import uuid
import hashlib
from django.core.mail import send_mail
from django.conf import settings
from django.template.loader import render_to_string
from django.contrib.sites.models import Site
from django.core.cache import cache
import logging
POSITIVE_STATES = [
TradeAcceptance.AcceptanceState.ACCEPTED,
TradeAcceptance.AcceptanceState.SENT,
TradeAcceptance.AcceptanceState.RECEIVED,
TradeAcceptance.AcceptanceState.THANKED_BY_INITIATOR,
TradeAcceptance.AcceptanceState.THANKED_BY_ACCEPTOR,
TradeAcceptance.AcceptanceState.THANKED_BY_BOTH,
]
def adjust_qty_for_trade_offer(trade_offer, card, side, delta):
"""
Increment (or decrement) qty_accepted by delta for the given card on the specified side.
"""
if side == 'have':
TradeOfferHaveCard.objects.filter(
trade_offer=trade_offer,
card=card
).update(qty_accepted=F('qty_accepted') + delta)
elif side == 'want':
TradeOfferWantCard.objects.filter(
trade_offer=trade_offer,
card=card
).update(qty_accepted=F('qty_accepted') + delta)
def update_trade_offer_closed_status(trade_offer):
"""
Check if both sides of the trade offer meet the quantity requirement.
Mark the trade_offer as closed if all cards on one side have qty_accepted
greater than or equal to quantity; otherwise, mark it as open.
"""
have_complete = not TradeOfferHaveCard.objects.filter(
trade_offer=trade_offer,
qty_accepted__lt=F('quantity')
).exists()
want_complete = not TradeOfferWantCard.objects.filter(
trade_offer=trade_offer,
qty_accepted__lt=F('quantity')
).exists()
closed = have_complete or want_complete
if trade_offer.is_closed != closed:
trade_offer.is_closed = closed
trade_offer.save(update_fields=["is_closed"])
@receiver(pre_save, sender=TradeAcceptance)
def trade_acceptance_pre_save(sender, instance, **kwargs):
# Skip signal processing during raw fixture load or when saving a new instance
if kwargs.get("raw", False) or instance._state.adding:
return
if instance.pk:
old_instance = TradeAcceptance.objects.get(pk=instance.pk)
instance._old_state = old_instance.state
@receiver(post_save, sender=TradeAcceptance)
def trade_acceptance_post_save(sender, instance, created, **kwargs):
delta = 0
if created:
if instance.state in POSITIVE_STATES:
delta = 1
else:
old_state = getattr(instance, '_old_state', None)
if old_state is not None:
if old_state in POSITIVE_STATES and instance.state not in POSITIVE_STATES:
delta = -1
elif old_state not in POSITIVE_STATES and instance.state in POSITIVE_STATES:
delta = 1
if delta != 0:
trade_offer = instance.trade_offer
adjust_qty_for_trade_offer(trade_offer, instance.requested_card, side='have', delta=delta)
adjust_qty_for_trade_offer(trade_offer, instance.offered_card, side='want', delta=delta)
update_trade_offer_closed_status(trade_offer)
@receiver(post_delete, sender=TradeAcceptance)
def trade_acceptance_post_delete(sender, instance, **kwargs):
if instance.state in POSITIVE_STATES:
delta = -1
trade_offer = instance.trade_offer
adjust_qty_for_trade_offer(trade_offer, instance.requested_card, side='have', delta=delta)
adjust_qty_for_trade_offer(trade_offer, instance.offered_card, side='want', delta=delta)
update_trade_offer_closed_status(trade_offer)
@receiver(post_save, sender=TradeAcceptance)
def trade_acceptance_email_notification(sender, instance, created, **kwargs):
# Only proceed if the update was triggered by an acting user.
if not hasattr(instance, "_actioning_user"):
return
# check if were in debug mode
# if settings.DEBUG:
# print("DEBUG: skipping email notification in debug mode")
# return
acting_user = instance._actioning_user
del instance._actioning_user
state = instance.state
if state == TradeAcceptance.AcceptanceState.ACCEPTED:
state = "accepted"
elif state == TradeAcceptance.AcceptanceState.SENT:
state = "sent"
elif state == TradeAcceptance.AcceptanceState.RECEIVED:
state = "received"
elif state == TradeAcceptance.AcceptanceState.THANKED_BY_INITIATOR:
state = "thanked_by_initiator"
elif state == TradeAcceptance.AcceptanceState.THANKED_BY_ACCEPTOR:
state = "thanked_by_acceptor"
elif state == TradeAcceptance.AcceptanceState.THANKED_BY_BOTH:
state = "thanked_by_both"
elif state == TradeAcceptance.AcceptanceState.REJECTED_BY_INITIATOR:
state = "rejected_by_initiator"
elif state == TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR:
state = "rejected_by_acceptor"
else:
return
# Determine the non-acting party:
if instance.trade_offer.initiated_by.user.pk == acting_user.pk:
# The initiator made the change; notify the acceptor.
recipient_user = instance.accepted_by.user
elif instance.accepted_by.user.pk == acting_user.pk:
# The acceptor made the change; notify the initiator.
recipient_user = instance.trade_offer.initiated_by.user
else:
return
if not recipient_user.enable_email_notifications:
return
is_initiator = instance.trade_offer.initiated_by.user.pk == acting_user.pk
email_context = {
"has_card": instance.requested_card,
"want_card": instance.offered_card,
"hash": instance.hash,
"acting_user": acting_user.username,
"acting_user_ign": instance.trade_offer.initiated_by.in_game_name if is_initiator else instance.accepted_by.in_game_name,
"recipient_user": recipient_user.username,
"recipient_user_ign": instance.accepted_by.in_game_name if is_initiator else instance.trade_offer.initiated_by.in_game_name,
"acting_user_friend_code": instance.trade_offer.initiated_by.friend_code if is_initiator else instance.accepted_by.friend_code,
"is_initiator": is_initiator,
"domain": "https://" + Site.objects.get_current().domain,
"pk": instance.pk,
}
email_template = "email/trades/trade_update_" + state + ".txt"
email_subject = render_to_string("email/common/subject.txt", email_context)
email_subject += render_to_string("email/trades/trade_update_" + state + "_subject.txt", email_context)
email_body = render_to_string(email_template, email_context)
send_mail(
email_subject,
email_body,
None,
[recipient_user.email],
)
@receiver(post_save, sender=TradeAcceptance)
def trade_acceptance_reputation_update(sender, instance, created, **kwargs):
"""
Update the denormalized reputation score on the user model based on
state transitions for TradeAcceptance.
- THANKED_BY_BOTH: both the initiator and the acceptor receive +1 when transitioning
into this state, and -1 when leaving it.
- REJECTED_BY_INITIATOR: only the acceptor gets -1 when transitioning into it (and +1 when leaving it).
- REJECTED_BY_ACCEPTOR: only the initiator gets -1 when transitioning into it (and +1 when leaving it).
Creation events are ignored because trade acceptances are never created with a terminal state.
"""
if created:
return # No action on creation as terminal states are not expected.
thanks_delta = 0
rejection_delta_initiator = 0 # Delta for the initiator's reputation
rejection_delta_acceptor = 0 # Delta for the acceptor's reputation
old_state = getattr(instance, '_old_state', None)
if old_state is None:
return
# Handle THANKED_BY_BOTH transitions
if old_state != TradeAcceptance.AcceptanceState.THANKED_BY_BOTH and instance.state == TradeAcceptance.AcceptanceState.THANKED_BY_BOTH:
thanks_delta = 1
elif old_state == TradeAcceptance.AcceptanceState.THANKED_BY_BOTH and instance.state != TradeAcceptance.AcceptanceState.THANKED_BY_BOTH:
thanks_delta = -1
# Handle REJECTED_BY_INITIATOR transitions (affects the acceptor)
if old_state != TradeAcceptance.AcceptanceState.REJECTED_BY_INITIATOR and instance.state == TradeAcceptance.AcceptanceState.REJECTED_BY_INITIATOR:
rejection_delta_acceptor = -1
elif old_state == TradeAcceptance.AcceptanceState.REJECTED_BY_INITIATOR and instance.state != TradeAcceptance.AcceptanceState.REJECTED_BY_INITIATOR:
rejection_delta_acceptor = 1
# Handle REJECTED_BY_ACCEPTOR transitions (affects the initiator)
if old_state != TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR and instance.state == TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR:
rejection_delta_initiator = -1
elif old_state == TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR and instance.state != TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR:
rejection_delta_initiator = 1
# Apply reputation updates:
# For THANKED_BY_BOTH, update both users.
if thanks_delta:
CustomUser.objects.filter(pk=instance.trade_offer.initiated_by.user.pk).update(
reputation_score=F("reputation_score") + thanks_delta
)
CustomUser.objects.filter(pk=instance.accepted_by.user.pk).update(
reputation_score=F("reputation_score") + thanks_delta
)
# For REJECTED_BY_INITIATOR, update only the acceptor.
if rejection_delta_acceptor:
CustomUser.objects.filter(pk=instance.accepted_by.user.pk).update(
reputation_score=F("reputation_score") + rejection_delta_acceptor
)
# For REJECTED_BY_ACCEPTOR, update only the initiator.
if rejection_delta_initiator:
CustomUser.objects.filter(pk=instance.trade_offer.initiated_by.user.pk).update(
reputation_score=F("reputation_score") + rejection_delta_initiator
)
@receiver(post_delete, sender=TradeAcceptance)
def trade_acceptance_reputation_delete(sender, instance, **kwargs):
"""
When a TradeAcceptance is deleted, adjust the reputation score for the
affected user(s) by reversing any reputation changes previously applied.
- If the deleted instance was in THANKED_BY_BOTH: subtract 1 from both parties.
- If it was in REJECTED_BY_INITIATOR: add 1 to the acceptor.
- If it was in REJECTED_BY_ACCEPTOR: add 1 to the initiator.
"""
if instance.state == TradeAcceptance.AcceptanceState.THANKED_BY_BOTH:
CustomUser.objects.filter(pk=instance.trade_offer.initiated_by.user.pk).update(
reputation_score=F("reputation_score") - 1
)
CustomUser.objects.filter(pk=instance.accepted_by.user.pk).update(
reputation_score=F("reputation_score") - 1
)
if instance.state == TradeAcceptance.AcceptanceState.REJECTED_BY_INITIATOR:
CustomUser.objects.filter(pk=instance.accepted_by.user.pk).update(
reputation_score=F("reputation_score") + 1
)
if instance.state == TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR:
CustomUser.objects.filter(pk=instance.trade_offer.initiated_by.user.pk).update(
reputation_score=F("reputation_score") + 1
)
@receiver(post_save, sender=TradeOfferHaveCard)
@receiver(post_delete, sender=TradeOfferHaveCard)
@receiver(post_save, sender=TradeOfferWantCard)
@receiver(post_delete, sender=TradeOfferWantCard)
@receiver(post_save, sender=TradeAcceptance)
@receiver(post_delete, sender=TradeAcceptance)
def bubble_up_trade_offer_updates(sender, instance, **kwargs):
"""
Bubble up updated_at to the TradeOffer model when related instances change.
Also invalidates any cached image by deleting the file.
"""
trade_offer = getattr(instance, 'trade_offer', None)
if trade_offer and trade_offer.image:
trade_offer.image.delete(save=True) # deleting the image will trigger a save, which updates the updated_at field
elif trade_offer:
trade_offer.save(update_fields=['updated_at'])

View file

@ -0,0 +1,141 @@
from django import template
from math import ceil
from pkmntrade_club.trades.models import TradeAcceptance
register = template.Library()
@register.inclusion_tag('templatetags/trade_offer.html', takes_context=True)
def render_trade_offer(context, offer):
"""
Renders a trade offer including detailed trade acceptance information.
Freezes the through-model querysets to avoid extra DB hits.
"""
trade_offer_have_cards = list(offer.trade_offer_have_cards.select_related('card').all())
trade_offer_want_cards = list(offer.trade_offer_want_cards.select_related('card').all())
acceptances = list(offer.acceptances.select_related('accepted_by__user', 'requested_card', 'offered_card').all())
have_cards_available = [
card for card in trade_offer_have_cards
if card.quantity > card.qty_accepted
]
want_cards_available = [
card for card in trade_offer_want_cards
if card.quantity > card.qty_accepted
]
if not have_cards_available or not want_cards_available:
flipped = True
else:
flipped = False
tag_context = {
'offer_pk': offer.pk,
'flipped': flipped,
'offer_hash': offer.hash,
'rarity_icon': offer.rarity_icon,
'initiated_by_email': offer.initiated_by.user.email,
'initiated_by_username': offer.initiated_by.user.username,
'initiated_reputation': offer.initiated_by.user.reputation_score,
'acceptances': acceptances,
'have_cards_available': have_cards_available,
'want_cards_available': want_cards_available,
'num_cards_available': len(have_cards_available) + len(want_cards_available),
'on_detail_page': context.get("request").path.endswith("trades/"+str(offer.pk)+"/"),
'cache_key': f'trade_offer_{offer.pk}_{offer.updated_at.timestamp()}_{flipped}',
}
context.update(tag_context)
return context
@register.inclusion_tag('templatetags/trade_acceptance.html', takes_context=True)
def render_trade_acceptance(context, acceptance):
"""
Renders a simple trade acceptance view with a single row and simplified header/footer.
"""
tag_context = {
"acceptance": acceptance,
'cache_key': f'trade_acceptance_{acceptance.pk}_{acceptance.updated_at.timestamp()}',
}
context.update(tag_context)
return context
@register.filter
def get_action_label(acceptance, state_value):
"""
Calls the acceptance's method to return the alternate action label.
"""
return acceptance.get_action_label_for_state(state_value)
@register.filter
def action_button_class(state_value):
"""
Returns daisyUI button classes based on the provided state value.
"""
mapping = {
'ACCEPTED': 'btn btn-primary',
'SENT': 'btn btn-info',
'RECEIVED': 'btn btn-info',
'THANKED_BY_INITIATOR': 'btn btn-success',
'THANKED_BY_ACCEPTOR': 'btn btn-success',
'THANKED_BY_BOTH': 'btn btn-success',
'REJECTED_BY_INITIATOR': 'btn btn-error',
'REJECTED_BY_ACCEPTOR': 'btn btn-error',
}
# Return a default style if the state isn't in the mapping.
return mapping.get(state_value, 'btn btn-outline')
@register.inclusion_tag('templatetags/trade_offer_png.html', takes_context=True)
def render_trade_offer_png(context, offer, show_friend_code=False):
CARD_HEIGHT = 32
CARD_WIDTH = 160
HEADER_HEIGHT = 69
FOOTER_HEIGHT = 37
CARD_WIDTH_PADDING = 64
EXPANDED_CARD_WIDTH_PADDING = 80
CARD_COL_GAP = 4
OUTPUT_PADDING = 24 # height padding is handled by the HTML
have_cards_available = offer.have_cards_available
want_cards_available = offer.want_cards_available
num_cards = max(len(have_cards_available), len(want_cards_available))
expanded = (len(have_cards_available) + len(want_cards_available)) > 4
if expanded:
num_cards = ceil(num_cards / 2) # 2 cards per row if expanded
image_height = (num_cards * CARD_HEIGHT) + ((num_cards - 1) * CARD_COL_GAP) + HEADER_HEIGHT + FOOTER_HEIGHT
if expanded:
image_width = (4 * CARD_WIDTH) + EXPANDED_CARD_WIDTH_PADDING
else:
image_width = (2 * CARD_WIDTH) + CARD_WIDTH_PADDING
image_width += OUTPUT_PADDING
image_height += OUTPUT_PADDING # height padding is handled by the HTML, but we need to also calculate it here for og meta tag use
request = context.get("request")
if request.get_host().startswith("localhost"):
base_url = "http://{0}".format(request.get_host())
else:
base_url = "https://{0}".format(request.get_host())
tag_context = {
'offer_pk': offer.pk,
'offer_hash': offer.hash,
'rarity_icon': offer.rarity_icon,
'initiated_by_email': offer.initiated_by.user.email,
'initiated_by_username': offer.initiated_by.user.username,
'have_cards_available': have_cards_available,
'want_cards_available': want_cards_available,
'in_game_name': offer.initiated_by.in_game_name,
'friend_code': offer.initiated_by.friend_code,
'show_friend_code': show_friend_code,
'num_cards_available': len(have_cards_available) + len(want_cards_available),
'expanded': expanded,
'image_width': image_width,
'image_height': image_height,
'base_url': base_url,
'cache_key': f'trade_offer_png_{offer.pk}_{offer.updated_at.timestamp()}_{expanded}',
}
context.update(tag_context)
return context

View file

@ -0,0 +1,925 @@
from django.test import TestCase, Client
from django.urls import reverse
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError
from django.http import QueryDict
from pkmntrade_club.accounts.models import FriendCode
from pkmntrade_club.cards.models import Card
from pkmntrade_club.trades.models import (
TradeOffer,
TradeOfferHaveCard,
TradeOfferWantCard,
TradeAcceptance,
)
from pkmntrade_club.trades.forms import (
TradeOfferCreateForm,
TradeAcceptanceCreateForm,
TradeOfferAcceptForm,
TradeAcceptanceTransitionForm,
)
from tests.utils.rarity import RARITY_MAPPING
# ------------------------------------------------------------------------
# Model Tests
# ------------------------------------------------------------------------
class TradeOfferModelTest(TestCase):
def setUp(self):
User = get_user_model()
self.user = User.objects.create_user(
username="testuser", email="test@example.com", password="password"
)
self.friend_code = FriendCode.objects.create(
friend_code="1234-5678-9012-3456", in_game_name="TestInGame", user=self.user
)
# Create cards with the same rarity (valid scenario)
self.card1 = Card.objects.create(
name="Card1", cardset="set1", cardnum=1, style="default",
rarity_icon=RARITY_MAPPING[1], rarity_level=1
)
self.card2 = Card.objects.create(
name="Card2", cardset="set1", cardnum=2, style="default",
rarity_icon=RARITY_MAPPING[1], rarity_level=1
)
# Create a card with a different rarity (to test invalid trade offers)
self.card3 = Card.objects.create(
name="Card3", cardset="set1", cardnum=3, style="default",
rarity_icon=RARITY_MAPPING[8], rarity_level=8
)
# Create a valid trade offer with consistent rarity details
self.trade_offer = TradeOffer.objects.create(initiated_by=self.friend_code)
TradeOfferHaveCard.objects.create(
trade_offer=self.trade_offer, card=self.card1, quantity=2
)
TradeOfferWantCard.objects.create(
trade_offer=self.trade_offer, card=self.card2, quantity=3
)
def test_update_rarity_fields_valid(self):
"""Test update_rarity_fields succeeds with cards sharing the same rarity."""
self.trade_offer.update_rarity_fields()
self.assertEqual(self.trade_offer.rarity_level, 1)
self.assertEqual(self.trade_offer.rarity_icon, "🔷")
def test_update_rarity_fields_invalid(self):
"""If a card with a different rarity is added, update_rarity_fields should raise an error."""
with self.assertRaisesMessage(
ValidationError, "All cards in a trade offer must have the same rarity."
):
TradeOfferHaveCard.objects.create(
trade_offer=self.trade_offer, card=self.card3, quantity=1
)
def test_hash_generation(self):
"""Verify that TradeOffer.hash is generated and 9 characters long ending with 'z'."""
self.assertTrue(self.trade_offer.hash.endswith("z"))
self.assertEqual(len(self.trade_offer.hash), 9)
class TradeAcceptanceModelTest(TestCase):
def setUp(self):
User = get_user_model()
# Create two users for testing state transitions
self.user = User.objects.create_user(
username="acceptuser", email="acc@example.com", password="password"
)
self.friend_code = FriendCode.objects.create(
friend_code="1111-2222-3333-4444", in_game_name="AccInGame", user=self.user
)
self.other_user = User.objects.create_user(
username="initiator", email="init@example.com", password="password"
)
self.initiator_friend_code = FriendCode.objects.create(
friend_code="5555-6666-7777-8888", in_game_name="InitInGame", user=self.other_user
)
# Create two cards (with the same rarity)
self.card1 = Card.objects.create(
name="CardA", cardset="setA", cardnum=1, style="default",
rarity_icon=RARITY_MAPPING[2], rarity_level=2
)
self.card2 = Card.objects.create(
name="CardB", cardset="setA", cardnum=2, style="default",
rarity_icon=RARITY_MAPPING[2], rarity_level=2
)
# Create a trade offer by the initiator.
self.trade_offer = TradeOffer.objects.create(
initiated_by=self.initiator_friend_code
)
TradeOfferHaveCard.objects.create(
trade_offer=self.trade_offer, card=self.card1, quantity=1
)
TradeOfferWantCard.objects.create(
trade_offer=self.trade_offer, card=self.card2, quantity=1
)
# Create an initial acceptance in state 'ACCEPTED'
self.acceptance = TradeAcceptance.objects.create(
trade_offer=self.trade_offer,
accepted_by=self.friend_code,
requested_card=self.card1,
offered_card=self.card2,
state=TradeAcceptance.AcceptanceState.ACCEPTED,
)
def test_invalid_state_transition(self):
"""
Test that an invalid state transition (not allowed by the current state)
raises a ValueError.
"""
# Attempt to transition to a state that is not allowed.
with self.assertRaises(ValueError):
self.acceptance.update_state(
TradeAcceptance.AcceptanceState.THANKED_BY_ACCEPTOR, user=self.user
)
def test_valid_state_transition_initiator(self):
"""
For a user that owns the trade offer (initiator), check that a valid
transition (e.g. from ACCEPTED to SENT) succeeds.
"""
allowed_transitions = dict(
self.acceptance.get_allowed_state_transitions(user=self.other_user)
)
# 'SENT' should be among allowed states for the initiator.
self.assertIn(TradeAcceptance.AcceptanceState.SENT, allowed_transitions)
self.acceptance.update_state(
TradeAcceptance.AcceptanceState.SENT, user=self.other_user
)
self.assertEqual(
self.acceptance.state, TradeAcceptance.AcceptanceState.SENT
)
def test_signal_adjusts_qty_accepted(self):
"""
Test that creation, state change, and deletion of a TradeAcceptance
update qty_accepted counters in through models correctly.
"""
have_through = TradeOfferHaveCard.objects.get(
trade_offer=self.trade_offer, card=self.card1
)
want_through = TradeOfferWantCard.objects.get(
trade_offer=self.trade_offer, card=self.card2
)
# Initially, one active acceptance should set qty_accepted to 1.
self.assertEqual(have_through.qty_accepted, 1)
self.assertEqual(want_through.qty_accepted, 1)
# Change state to a terminal state so that signals decrement the counter.
self.acceptance.state = TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR
self.acceptance.save()
have_through.refresh_from_db()
want_through.refresh_from_db()
self.assertEqual(have_through.qty_accepted, 0)
self.assertEqual(want_through.qty_accepted, 0)
# Create a new acceptance and then delete it.
new_acceptance = TradeAcceptance.objects.create(
trade_offer=self.trade_offer,
accepted_by=self.friend_code,
requested_card=self.card1,
offered_card=self.card2,
state=TradeAcceptance.AcceptanceState.ACCEPTED,
)
have_through.refresh_from_db()
self.assertEqual(have_through.qty_accepted, 1)
new_acceptance.delete()
have_through.refresh_from_db()
self.assertEqual(have_through.qty_accepted, 0)
# ------------------------------------------------------------------------
# Form Tests
# ------------------------------------------------------------------------
class TradeOfferFormTest(TestCase):
def setUp(self):
User = get_user_model()
self.user = User.objects.create_user(
username="formuser", email="form@example.com", password="password"
)
self.friend_code = FriendCode.objects.create(
friend_code="9999-8888-7777-6666", in_game_name="FormUser", user=self.user
)
# Create two cards with the same rarity details.
self.card1 = Card.objects.create(
name="FormCard1", cardset="formset", cardnum=1, style="default",
rarity_icon=RARITY_MAPPING[3], rarity_level=3
)
self.card2 = Card.objects.create(
name="FormCard2", cardset="formset", cardnum=2, style="default",
rarity_icon=RARITY_MAPPING[3], rarity_level=3
)
def test_trade_offer_create_form_valid(self):
"""
A valid POST using colon-separated quantity strings should succeed.
"""
# Build a QueryDict with multiple values for each list field.
qd = QueryDict('', mutable=True)
qd.setlist("have_cards", [f"{self.card1.pk}:2"])
qd.setlist("want_cards", [f"{self.card2.pk}:3"])
# 'initiated_by' is a normal field so we can update it directly.
qd.update({"initiated_by": self.friend_code.pk})
form = TradeOfferCreateForm(data=qd)
self.assertTrue(form.is_valid())
def test_trade_offer_create_form_invalid_quantity(self):
"""
If quantity cannot be parsed as an integer a ValidationError should be raised.
"""
qd = QueryDict('', mutable=True)
# Provide an invalid quantity ("two" instead of an integer).
qd.setlist("have_cards", [f"{self.card1.pk}:two"])
qd.setlist("want_cards", [f"{self.card2.pk}:3"])
qd.update({"initiated_by": self.friend_code.pk})
form = TradeOfferCreateForm(data=qd)
self.assertFalse(form.is_valid())
self.assertIn("Invalid quantity provided", str(form.errors))
def test_trade_offer_create_form_missing_colon(self):
"""
An entry missing a colon should be ignored.
"""
qd = QueryDict('', mutable=True)
# No colon present in the selections.
qd.setlist("have_cards", [f"{self.card1.pk}"])
qd.setlist("want_cards", [f"{self.card2.pk}"])
qd.update({"initiated_by": self.friend_code.pk})
form = TradeOfferCreateForm(data=qd)
self.assertTrue(form.is_valid())
# Since the entries are ignored, cleaned_data should have empty dictionaries.
self.assertEqual(form.cleaned_data["have_cards"], {})
self.assertEqual(form.cleaned_data["want_cards"], {})
def test_trade_acceptance_create_form(self):
"""Test that the TradeAcceptanceCreateForm filters available cards based on trade offer availability."""
# Create a trade offer with available quantities.
trade_offer = TradeOffer.objects.create(initiated_by=self.friend_code)
TradeOfferHaveCard.objects.create(
trade_offer=trade_offer, card=self.card1, quantity=2
)
TradeOfferWantCard.objects.create(
trade_offer=trade_offer, card=self.card2, quantity=2
)
friend_codes = FriendCode.objects.filter(pk=self.friend_code.pk)
form_data = {
"accepted_by": self.friend_code.pk,
"requested_card": self.card1.pk,
"offered_card": self.card2.pk,
}
form = TradeAcceptanceCreateForm(
data=form_data, trade_offer=trade_offer, friend_codes=friend_codes
)
self.assertTrue(form.is_valid())
instance = form.save()
self.assertEqual(instance.trade_offer, trade_offer)
self.assertEqual(instance.accepted_by, self.friend_code)
def test_trade_offer_accept_form(self):
"""Test that TradeOfferAcceptForm correctly sets the friend_code queryset."""
friend_codes = FriendCode.objects.filter(pk=self.friend_code.pk)
form = TradeOfferAcceptForm(friend_codes=friend_codes)
self.assertEqual(
list(form.fields["friend_code"].queryset), list(friend_codes)
)
def test_trade_acceptance_transition_form(self):
"""Test that the transition form provides only allowed transitions."""
other_user = get_user_model().objects.create_user(
username="transuser", email="trans@example.com", password="password"
)
other_friend_code = FriendCode.objects.create(
friend_code="FC-TRANS", in_game_name="TransUser", user=other_user
)
# Create a trade offer with initiator being our test friend code.
trade_offer = TradeOffer.objects.create(initiated_by=self.friend_code)
TradeOfferHaveCard.objects.create(
trade_offer=trade_offer, card=self.card1, quantity=1
)
TradeOfferWantCard.objects.create(
trade_offer=trade_offer, card=self.card2, quantity=1
)
acceptance = TradeAcceptance.objects.create(
trade_offer=trade_offer,
accepted_by=other_friend_code,
requested_card=self.card1,
offered_card=self.card2,
state=TradeAcceptance.AcceptanceState.ACCEPTED,
)
form = TradeAcceptanceTransitionForm(instance=acceptance, user=other_user)
# Compare the form's state choices with the allowed transitions.
allowed = [choice[0] for choice in acceptance.get_allowed_state_transitions(user=other_user)]
form_choices = [choice[0] for choice in form.fields["state"].choices]
for choice in allowed:
self.assertIn(choice, form_choices)
# ------------------------------------------------------------------------
# View Tests
# ------------------------------------------------------------------------
class TradeViewsTest(TestCase):
def setUp(self):
User = get_user_model()
self.client = Client()
self.user = User.objects.create_user(
username="viewuser", email="view@example.com", password="password"
)
self.friend_code = FriendCode.objects.create(
friend_code="4444-3333-2222-1111", in_game_name="ViewUser", user=self.user
)
self.user.default_friend_code = self.friend_code
self.user.save(update_fields=["default_friend_code"])
self.client.login(username="viewuser", password="password")
# Create sample cards.
self.card1 = Card.objects.create(
name="ViewCard1", cardset="setV", cardnum=1, style="default",
rarity_icon=RARITY_MAPPING[7], rarity_level=7
)
self.card2 = Card.objects.create(
name="ViewCard2", cardset="setV", cardnum=2, style="default",
rarity_icon=RARITY_MAPPING[7], rarity_level=7
)
# Create a trade offer initiated by the logged-in user's friend code.
self.trade_offer = TradeOffer.objects.create(initiated_by=self.friend_code)
TradeOfferHaveCard.objects.create(
trade_offer=self.trade_offer, card=self.card1, quantity=1
)
TradeOfferWantCard.objects.create(
trade_offer=self.trade_offer, card=self.card2, quantity=1
)
def test_trade_offer_create_view_get(self):
"""GET request to TradeOfferCreateView should include friend_codes and cards in context."""
response = self.client.get(reverse("trade_offer_create"))
self.assertEqual(response.status_code, 200)
self.assertIn("friend_codes", response.context)
self.assertIn("cards", response.context)
# When there is only one friend code, the initial value should be preset.
self.assertEqual(
response.context["form"].initial.get("initiated_by"),
self.friend_code.pk,
)
def test_trade_offer_delete_view_permission(self):
"""
The delete view should enforce that only trade offers initiated by one
of the user's friend codes can be deleted.
"""
other_user = get_user_model().objects.create_user(
username="otheruser", email="other@example.com", password="password"
)
other_friend_code = FriendCode.objects.create(
friend_code="FC-OTHER", in_game_name="OtherUser", user=other_user
)
trade_offer_other = TradeOffer.objects.create(initiated_by=other_friend_code)
url = reverse("trade_offer_delete", kwargs={"pk": trade_offer_other.pk})
response = self.client.get(url)
self.assertEqual(response.status_code, 403)
def test_trade_offer_delete_view_close(self):
"""
If a trade offer has active acceptances, the delete view should not delete it.
Instead, if no active acceptances remain it should mark the offer as closed.
"""
# Create a trade offer with an active acceptance.
trade_offer_with_acceptance = TradeOffer.objects.create(initiated_by=self.friend_code)
# Use quantity=2 so the trade offer isn't automatically closed when one acceptance is created
TradeOfferHaveCard.objects.create(
trade_offer=trade_offer_with_acceptance, card=self.card1, quantity=2
)
TradeOfferWantCard.objects.create(
trade_offer=trade_offer_with_acceptance, card=self.card2, quantity=2
)
# Create an acceptance that takes one card from each side
TradeAcceptance.objects.create(
trade_offer=trade_offer_with_acceptance,
accepted_by=self.friend_code,
requested_card=self.card1,
offered_card=self.card2,
state=TradeAcceptance.AcceptanceState.ACCEPTED,
)
delete_url = reverse("trade_offer_delete", kwargs={"pk": trade_offer_with_acceptance.pk})
# --- Patch the view's get_object() method to return our trade offer ---
from pkmntrade_club.trades.views import TradeOfferDeleteView
orig_get_object = TradeOfferDeleteView.get_object
TradeOfferDeleteView.get_object = lambda self: trade_offer_with_acceptance
try:
# First POST: with active acceptance, deletion should not close the offer.
response = self.client.post(delete_url)
trade_offer_with_acceptance.refresh_from_db()
self.assertFalse(trade_offer_with_acceptance.is_closed)
# Now simulate no active acceptances by updating the acceptance state.
acceptance = trade_offer_with_acceptance.acceptances.first()
acceptance.state = TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR
acceptance.save(update_fields=["state"])
response = self.client.post(delete_url)
trade_offer_with_acceptance.refresh_from_db()
self.assertTrue(trade_offer_with_acceptance.is_closed)
finally:
# Always restore the original method.
TradeOfferDeleteView.get_object = orig_get_object
def test_trade_acceptance_update_view(self):
"""Test updating a trade acceptance via the update view."""
trade_offer_new = TradeOffer.objects.create(initiated_by=self.friend_code)
TradeOfferHaveCard.objects.create(
trade_offer=trade_offer_new, card=self.card1, quantity=1
)
TradeOfferWantCard.objects.create(
trade_offer=trade_offer_new, card=self.card2, quantity=1
)
acceptance = TradeAcceptance.objects.create(
trade_offer=trade_offer_new,
accepted_by=self.friend_code,
requested_card=self.card1,
offered_card=self.card2,
state=TradeAcceptance.AcceptanceState.ACCEPTED,
)
update_url = reverse("trade_acceptance_update", kwargs={"pk": acceptance.pk})
# First, try an invalid state update.
response = self.client.post(update_url, {"state": "INVALID_STATE"})
self.assertEqual(response.status_code, 200)
form = response.context.get("form")
self.assertIsNotNone(form, "Form should be present in the response context.")
self.assertIn(
"state", form.errors,
"Expected an error on the 'state' field when an invalid state is submitted."
)
self.assertTrue(form.errors["state"], "The 'state' field should have error messages.")
# Next, if there is an allowed valid transition, try it.
allowed_states = [choice[0] for choice in acceptance.get_allowed_state_transitions(user=self.user)]
if allowed_states:
valid_state = allowed_states[0]
response = self.client.post(update_url, {"state": valid_state})
self.assertEqual(response.status_code, 302)
class TradeOfferSecurityTests(TestCase):
def setUp(self):
User = get_user_model()
# Create three users for testing various security scenarios
self.user1 = User.objects.create_user(
username="user1", email="user1@example.com", password="password1"
)
self.user2 = User.objects.create_user(
username="user2", email="user2@example.com", password="password2"
)
self.user3 = User.objects.create_user(
username="user3", email="user3@example.com", password="password3"
)
# Create friend codes for each user with correct format
self.fc1 = FriendCode.objects.create(
friend_code="1111-2222-3333-4444", in_game_name="User1Game", user=self.user1
)
self.fc2 = FriendCode.objects.create(
friend_code="5555-6666-7777-8888", in_game_name="User2Game", user=self.user2
)
self.fc3 = FriendCode.objects.create(
friend_code="9999-0000-1111-2222", in_game_name="User3Game", user=self.user3
)
# Create test cards with proper rarity levels
self.card1 = Card.objects.create(
name="SecCard1", cardset="secset", cardnum=1, style="default",
rarity_icon=RARITY_MAPPING[3], rarity_level=3
)
self.card2 = Card.objects.create(
name="SecCard2", cardset="secset", cardnum=2, style="default",
rarity_icon=RARITY_MAPPING[3], rarity_level=3
)
# Create a trade offer by user1
self.trade_offer = TradeOffer.objects.create(initiated_by=self.fc1)
TradeOfferHaveCard.objects.create(
trade_offer=self.trade_offer, card=self.card1, quantity=1
)
TradeOfferWantCard.objects.create(
trade_offer=self.trade_offer, card=self.card2, quantity=1
)
self.client = Client()
def test_unauthorized_trade_offer_deletion(self):
"""Test that users cannot delete trade offers they don't own."""
self.client.login(username="user2", password="password2")
response = self.client.post(
reverse("trade_offer_delete", kwargs={"pk": self.trade_offer.pk})
)
self.assertEqual(response.status_code, 403)
self.assertTrue(TradeOffer.objects.filter(pk=self.trade_offer.pk).exists())
def test_unauthorized_trade_acceptance_update(self):
"""Test that uninvolved users cannot update trade acceptances."""
# Create an acceptance between user2 and user1's offer
acceptance = TradeAcceptance.objects.create(
trade_offer=self.trade_offer,
accepted_by=self.fc2,
requested_card=self.card1,
offered_card=self.card2,
state=TradeAcceptance.AcceptanceState.ACCEPTED,
)
# Try to update the acceptance as user3 (uninvolved)
self.client.login(username="user3", password="password3")
response = self.client.post(
reverse("trade_acceptance_update", kwargs={"pk": acceptance.pk}),
{"state": TradeAcceptance.AcceptanceState.SENT}
)
self.assertEqual(response.status_code, 403)
def test_cross_user_friend_code_manipulation(self):
"""Test that users cannot use other users' friend codes."""
self.client.login(username="user2", password="password2")
# Try to create a trade offer using user1's friend code
response = self.client.get(
reverse("trade_offer_create"),
{
"initiated_by": self.fc1.pk, # User1's friend code
"have_cards": [f"{self.card1.pk}:1"],
"want_cards": [f"{self.card2.pk}:1"],
}
)
self.assertEqual(response.status_code, 200) # Form should fail validation
self.assertFalse(
TradeOffer.objects.filter(initiated_by=self.fc1).count() > 1
)
def test_authenticated_only_views(self):
"""Test that authenticated-only views are properly protected."""
# Test without login
urls_to_test = [
reverse("trade_offer_create"),
reverse("trade_offer_dashboard"),
reverse("trade_acceptance_create", kwargs={"offer_pk": self.trade_offer.pk}),
]
# First ensure we're logged out
self.client.logout()
for url in urls_to_test:
response = self.client.get(url)
self.assertRedirects(
response,
f"/accounts/login/?next={url}",
msg_prefix=f"URL {url} should require authentication"
)
class TradeOfferEdgeCasesTest(TestCase):
def setUp(self):
User = get_user_model()
self.user = User.objects.create_user(
username="edgeuser", email="edge@example.com", password="password"
)
self.friend_code = FriendCode.objects.create(
friend_code="3333-4444-5555-6666", in_game_name="EdgeUser", user=self.user
)
# Create test cards with different rarities using proper levels and icons
self.common_card = Card.objects.create(
name="CommonCard", cardset="edgeset", cardnum=1, style="default",
rarity_icon=RARITY_MAPPING[1], rarity_level=1
)
self.rare_card = Card.objects.create(
name="RareCard", cardset="edgeset", cardnum=2, style="default",
rarity_icon=RARITY_MAPPING[5], rarity_level=5
)
self.crown_card = Card.objects.create(
name="CrownCard", cardset="edgeset", cardnum=3, style="default",
rarity_icon=RARITY_MAPPING[8], rarity_level=8
)
self.client = Client()
self.client.login(username="edgeuser", password="password")
def test_zero_quantity_trade_offer(self):
"""Test that trade offers with zero quantity are handled properly."""
response = self.client.get(
reverse("trade_offer_create"),
{
"initiated_by": self.friend_code.pk,
"have_cards": [f"{self.common_card.pk}:0"],
"want_cards": [f"{self.common_card.pk}:1"],
}
)
self.assertEqual(response.status_code, 200)
self.assertFalse(
TradeOffer.objects.filter(initiated_by=self.friend_code).exists()
)
def test_negative_quantity_trade_offer(self):
"""Test that trade offers with negative quantity are handled properly."""
response = self.client.get(
reverse("trade_offer_create"),
{
"initiated_by": self.friend_code.pk,
"have_cards": [f"{self.common_card.pk}:-1"],
"want_cards": [f"{self.common_card.pk}:1"],
}
)
self.assertEqual(response.status_code, 200)
self.assertFalse(
TradeOffer.objects.filter(initiated_by=self.friend_code).exists()
)
def test_mixed_rarity_trade_offer(self):
"""Test that trade offers with mixed rarity cards are rejected."""
response = self.client.get(
reverse("trade_offer_create"),
{
"initiated_by": self.friend_code.pk,
"have_cards": [f"{self.common_card.pk}:1"],
"want_cards": [f"{self.crown_card.pk}:1"],
}
)
self.assertEqual(response.status_code, 200)
self.assertFalse(
TradeOffer.objects.filter(initiated_by=self.friend_code).exists()
)
def test_duplicate_card_entries(self):
"""Test handling of duplicate card entries in trade offers."""
response = self.client.get(
reverse("trade_offer_create"),
{
"initiated_by": self.friend_code.pk,
"have_cards": [
f"{self.common_card.pk}:1",
f"{self.common_card.pk}:1"
],
"want_cards": [f"{self.common_card.pk}:1"],
}
)
self.assertEqual(response.status_code, 200)
self.assertFalse(
TradeOffer.objects.filter(initiated_by=self.friend_code).exists()
)
class TradeSearchTests(TestCase):
def setUp(self):
User = get_user_model()
self.user = User.objects.create_user(
username="searchuser", email="search@example.com", password="password"
)
self.friend_code = FriendCode.objects.create(
friend_code="7777-8888-9999-0000", in_game_name="SearchUser", user=self.user
)
# Create test cards with proper rarity levels
self.card1 = Card.objects.create(
name="SearchCard1", cardset="sc1", cardnum=1, style="default",
rarity_icon=RARITY_MAPPING[4], rarity_level=4
)
self.card2 = Card.objects.create(
name="SearchCard2", cardset="sc1", cardnum=2, style="default",
rarity_icon=RARITY_MAPPING[4], rarity_level=4
)
self.card3 = Card.objects.create(
name="SearchCard3", cardset="sc1", cardnum=3, style="default",
rarity_icon=RARITY_MAPPING[4], rarity_level=4
)
# Create some trade offers
self.trade_offer1 = TradeOffer.objects.create(initiated_by=self.friend_code)
TradeOfferHaveCard.objects.create(
trade_offer=self.trade_offer1, card=self.card1, quantity=2
)
TradeOfferWantCard.objects.create(
trade_offer=self.trade_offer1, card=self.card2, quantity=1
)
self.trade_offer2 = TradeOffer.objects.create(initiated_by=self.friend_code)
TradeOfferHaveCard.objects.create(
trade_offer=self.trade_offer2, card=self.card2, quantity=1
)
TradeOfferWantCard.objects.create(
trade_offer=self.trade_offer2, card=self.card3, quantity=1
)
self.client = Client()
def test_search_by_have_cards(self):
"""Test searching for trade offers by cards the user has doesn't show offers initiated by the user."""
response = self.client.post(
reverse("trade_offer_search"),
{
"have_cards": [f"{self.card2.pk}:1"],
}
)
self.assertEqual(response.status_code, 200)
self.assertNotContains(response, self.trade_offer1.initiated_by.in_game_name)
self.assertNotContains(response, self.trade_offer2.initiated_by.in_game_name)
def test_search_by_want_cards(self):
"""Test searching for trade offers by cards the user wants doesn't show offers initiated by the user."""
response = self.client.post(
reverse("trade_offer_search"),
{
"want_cards": [f"{self.card1.pk}:1"],
}
)
self.assertEqual(response.status_code, 200)
self.assertNotContains(response, self.trade_offer1.initiated_by.in_game_name)
self.assertNotContains(response, self.trade_offer2.initiated_by.in_game_name)
def test_search_with_invalid_card_id(self):
"""Test search behavior with invalid card IDs."""
response = self.client.post(
reverse("trade_offer_search"),
{
"have_cards": ["999999:1"], # Non-existent card ID
}
)
self.assertEqual(response.status_code, 200)
self.assertNotContains(response, self.trade_offer1.initiated_by.in_game_name)
self.assertNotContains(response, self.trade_offer2.initiated_by.in_game_name)
def test_search_closed_trades(self):
"""Test that closed trades don't appear in search results."""
self.trade_offer1.is_closed = True
self.trade_offer1.save()
response = self.client.post(
reverse("trade_offer_search"),
{
"have_cards": [f"{self.card2.pk}:1"],
}
)
self.assertEqual(response.status_code, 200)
self.assertNotContains(response, self.trade_offer1.initiated_by.in_game_name)
class TradeAcceptanceComplexTests(TestCase):
def setUp(self):
User = get_user_model()
self.initiator = User.objects.create_user(
username="initiator", email="init@example.com", password="password"
)
self.acceptor = User.objects.create_user(
username="acceptor", email="accept@example.com", password="password"
)
self.initiator_fc = FriendCode.objects.create(
friend_code="1234-5678-9012-3456", in_game_name="InitUser", user=self.initiator
)
self.acceptor_fc = FriendCode.objects.create(
friend_code="6543-2109-8765-4321", in_game_name="AcceptUser", user=self.acceptor
)
# Create test cards with proper rarity levels
self.card1 = Card.objects.create(
name="ComplexCard1", cardset="cx1", cardnum=1, style="default",
rarity_icon=RARITY_MAPPING[6], rarity_level=6
)
self.card2 = Card.objects.create(
name="ComplexCard2", cardset="cx1", cardnum=2, style="default",
rarity_icon=RARITY_MAPPING[6], rarity_level=6
)
self.card3 = Card.objects.create(
name="ComplexCard3", cardset="cx1", cardnum=3, style="default",
rarity_icon=RARITY_MAPPING[6], rarity_level=6
)
self.card4 = Card.objects.create(
name="ComplexCard4", cardset="cx1", cardnum=4, style="default",
rarity_icon=RARITY_MAPPING[6], rarity_level=6
)
# Create a trade offer with multiple quantities
self.trade_offer = TradeOffer.objects.create(initiated_by=self.initiator_fc)
TradeOfferHaveCard.objects.create(
trade_offer=self.trade_offer, card=self.card1, quantity=3
)
TradeOfferHaveCard.objects.create(
trade_offer=self.trade_offer, card=self.card3, quantity=1
)
TradeOfferWantCard.objects.create(
trade_offer=self.trade_offer, card=self.card2, quantity=3
)
TradeOfferWantCard.objects.create(
trade_offer=self.trade_offer, card=self.card4, quantity=1
)
self.client = Client()
def test_multiple_acceptances_quantity_limit(self):
"""Test that multiple acceptances cannot exceed the offer's quantity limit."""
self.client.login(username="acceptor", password="password")
# Create first acceptance
response1 = self.client.post(
reverse("trade_acceptance_create", kwargs={"offer_pk": self.trade_offer.pk}),
{
"accepted_by": self.acceptor_fc.pk,
"requested_card": self.card1.pk,
"offered_card": self.card2.pk,
}
)
self.assertEqual(response1.status_code, 302) # Successful creation
# Create second acceptance
response2 = self.client.post(
reverse("trade_acceptance_create", kwargs={"offer_pk": self.trade_offer.pk}),
{
"accepted_by": self.acceptor_fc.pk,
"requested_card": self.card1.pk,
"offered_card": self.card2.pk,
}
)
self.assertEqual(response2.status_code, 302) # Successful creation
# Try to create a fourth acceptance (should fail as only 3 are allowed)
response3 = self.client.post(
reverse("trade_acceptance_create", kwargs={"offer_pk": self.trade_offer.pk}),
{
"accepted_by": self.acceptor_fc.pk,
"requested_card": self.card1.pk,
"offered_card": self.card2.pk,
}
)
self.assertEqual(response3.status_code, 302) # Successful creation
response4 = self.client.post(
reverse("trade_acceptance_create", kwargs={"offer_pk": self.trade_offer.pk}),
{
"accepted_by": self.acceptor_fc.pk,
"requested_card": self.card1.pk,
"offered_card": self.card2.pk,
}
)
self.assertEqual(response4.status_code, 200) # Should fail
self.assertEqual(
self.trade_offer.acceptances.count(), 3,
"Should not allow more acceptances than the quantity limit"
)
def test_complex_state_transitions(self):
"""Test complex state transition scenarios."""
self.client.login(username="acceptor", password="password")
# Create an acceptance
acceptance = TradeAcceptance.objects.create(
trade_offer=self.trade_offer,
accepted_by=self.acceptor_fc,
requested_card=self.card1,
offered_card=self.card2,
state=TradeAcceptance.AcceptanceState.ACCEPTED,
)
# Test invalid state transition sequence
invalid_transitions = [
TradeAcceptance.AcceptanceState.THANKED_BY_ACCEPTOR, # Can't thank before sending
TradeAcceptance.AcceptanceState.RECEIVED, # Can't receive before sending
TradeAcceptance.AcceptanceState.THANKED_BY_BOTH, # Can't thank by both directly
]
for invalid_state in invalid_transitions:
response = self.client.post(
reverse("trade_acceptance_update", kwargs={"pk": acceptance.pk}),
{"state": invalid_state}
)
self.assertEqual(response.status_code, 200) # Should stay on form
acceptance.refresh_from_db()
self.assertEqual(
acceptance.state,
TradeAcceptance.AcceptanceState.ACCEPTED,
f"Invalid transition to {invalid_state} should not be allowed"
)
# Test valid state transition sequence
valid_transitions = [
(self.initiator, TradeAcceptance.AcceptanceState.SENT),
(self.acceptor, TradeAcceptance.AcceptanceState.RECEIVED),
(self.initiator, TradeAcceptance.AcceptanceState.THANKED_BY_INITIATOR),
(self.acceptor, TradeAcceptance.AcceptanceState.THANKED_BY_BOTH),
]
for user, state in valid_transitions:
self.client.login(username=user.username, password="password")
response = self.client.post(
reverse("trade_acceptance_update", kwargs={"pk": acceptance.pk}),
{"state": state}
)
self.assertEqual(response.status_code, 302) # Should redirect on success
acceptance.refresh_from_db()
self.assertEqual(
acceptance.state,
state,
f"Valid transition to {state} should be allowed"
)

View file

@ -0,0 +1,24 @@
from django.urls import path
from .views import (
TradeOfferCreateView,
TradeOfferCreateConfirmView,
TradeOfferAllListView,
TradeOfferDetailView,
TradeAcceptanceCreateView,
TradeAcceptanceUpdateView,
TradeOfferDeleteView,
TradeOfferSearchView,
TradeOfferPNGView,
)
urlpatterns = [
path("create/", TradeOfferCreateView.as_view(), name="trade_offer_create"),
path("create/confirm/", TradeOfferCreateConfirmView.as_view(), name="trade_offer_confirm_create"),
path("", TradeOfferAllListView.as_view(), name="trade_offer_list"),
path("search/", TradeOfferSearchView.as_view(), name="trade_offer_search"),
path("<int:pk>/", TradeOfferDetailView.as_view(), name="trade_offer_detail"),
path("<int:pk>.png", TradeOfferPNGView.as_view(), name="trade_offer_png"),
path("delete/<int:pk>/", TradeOfferDeleteView.as_view(), name="trade_offer_delete"),
path("accept/<int:offer_pk>", TradeAcceptanceCreateView.as_view(), name="trade_acceptance_create"),
path("update/<int:pk>/", TradeAcceptanceUpdateView.as_view(), name="trade_acceptance_update"),
]

View file

@ -0,0 +1,749 @@
from django.template import RequestContext
from django.views.generic import DeleteView, CreateView, ListView, DetailView, UpdateView
from django.views import View
from django.urls import reverse_lazy
from django.http import HttpResponseRedirect
from django.contrib.auth.mixins import LoginRequiredMixin
from django.shortcuts import render
from django.core.exceptions import PermissionDenied, ValidationError
from django.core.paginator import Paginator
from django.contrib import messages
from meta.views import Meta
from .models import TradeOffer, TradeAcceptance
from .forms import (TradeAcceptanceCreateForm, TradeOfferCreateForm, TradeAcceptanceTransitionForm)
from django.template.loader import render_to_string
from pkmntrade_club.trades.templatetags.trade_offer_tags import render_trade_offer_png
from playwright.sync_api import sync_playwright
from django.conf import settings
from .mixins import FriendCodeRequiredMixin
from pkmntrade_club.common.mixins import ReusablePaginationMixin
class TradeOfferCreateView(LoginRequiredMixin, CreateView):
http_method_names = ['get'] # restricts this view to GET only
model = TradeOffer
form_class = TradeOfferCreateForm
template_name = "trades/trade_offer_create.html"
success_url = reverse_lazy("trade_offer_list")
def get_form(self, form_class=None):
form = super().get_form(form_class)
# Restrict the 'initiated_by' choices to friend codes owned by the logged-in user.
form.fields["initiated_by"].queryset = self.request.user.friend_codes.all()
return form
def get_initial(self):
initial = super().get_initial()
initial["have_cards"] = self.request.GET.getlist("have_cards")
initial["want_cards"] = self.request.GET.getlist("want_cards")
if self.request.user.friend_codes.count() == 1:
initial["initiated_by"] = self.request.user.friend_codes.first().pk
return initial
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
from pkmntrade_club.cards.models import Card
# Ensure available_cards is a proper QuerySet
context["cards"] = Card.objects.filter(rarity_level__lte=5).order_by("name", "rarity_level")
friend_codes = self.request.user.friend_codes.all()
if "initiated_by" in self.request.GET:
try:
selected_friend_code = friend_codes.get(pk=self.request.GET.get("initiated_by"))
except friend_codes.model.DoesNotExist:
selected_friend_code = self.request.user.default_friend_code or friend_codes.first()
else:
selected_friend_code = self.request.user.default_friend_code or friend_codes.first()
context["friend_codes"] = friend_codes
context["selected_friend_code"] = selected_friend_code
return context
class TradeOfferAllListView(ReusablePaginationMixin, ListView):
model = TradeOffer
template_name = "trades/trade_offer_all_list.html"
def get_context_data(self, *, object_list=None, **kwargs):
context = super().get_context_data(**kwargs)
request = self.request
show_closed = request.GET.get("show_closed", "false").lower() == "true"
context["show_closed"] = show_closed
queryset = TradeOffer.objects.all()
if show_closed:
queryset = queryset.filter(is_closed=True)
else:
queryset = queryset.filter(is_closed=False)
page_number = self.get_page_number()
self.per_page = 10
paginated_offers, pagination_context = self.paginate_data(queryset, page_number)
context["offers"] = paginated_offers
context["page_obj"] = pagination_context
# Add the expanded flag to the context based on the URL query parameter.
context["expanded"] = request.GET.get("expanded", "false").lower() == "true"
return context
def render_to_response(self, context, **response_kwargs):
if self.request.headers.get("X-Requested-With") == "XMLHttpRequest":
show_closed = self.request.GET.get("show_closed", "false").lower() == "true"
expanded = self.request.GET.get("expanded", "false").lower() == "true"
queryset = TradeOffer.objects.all()
if show_closed:
queryset = queryset.filter(is_closed=True)
else:
queryset = queryset.filter(is_closed=False)
page_number = self.get_page_number()
self.per_page = 10
paginated_offers, pagination_context = self.paginate_data(queryset, page_number)
return render(
self.request,
"trades/_trade_offer_list.html",
{"offers": paginated_offers, "page_obj": pagination_context, "expanded": expanded}
)
return super().render_to_response(context, **response_kwargs)
class TradeOfferDeleteView(LoginRequiredMixin, FriendCodeRequiredMixin, DeleteView):
model = TradeOffer
success_url = reverse_lazy("trade_offer_list")
template_name = "trades/trade_offer_delete.html"
def dispatch(self, request, *args, **kwargs):
self.object = super().get_object()
if self.object.initiated_by_id not in request.user.friend_codes.values_list("id", flat=True):
raise PermissionDenied("You are not authorized to delete or close this trade offer.")
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
trade_offer = self.object
terminal_states = [
TradeAcceptance.AcceptanceState.THANKED_BY_INITIATOR,
TradeAcceptance.AcceptanceState.THANKED_BY_ACCEPTOR,
TradeAcceptance.AcceptanceState.THANKED_BY_BOTH,
TradeAcceptance.AcceptanceState.REJECTED_BY_INITIATOR,
TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR,
]
active_acceptances = trade_offer.acceptances.exclude(state__in=terminal_states)
if trade_offer.acceptances.count() == 0:
context["action"] = "delete"
elif trade_offer.acceptances.count() > 0 and not active_acceptances.exists():
context["action"] = "close"
else:
context["action"] = None
return context
def post(self, request, *args, **kwargs):
trade_offer = self.object
terminal_states = [
TradeAcceptance.AcceptanceState.THANKED_BY_INITIATOR,
TradeAcceptance.AcceptanceState.THANKED_BY_ACCEPTOR,
TradeAcceptance.AcceptanceState.THANKED_BY_BOTH,
TradeAcceptance.AcceptanceState.REJECTED_BY_INITIATOR,
TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR,
]
active_acceptances = trade_offer.acceptances.exclude(state__in=terminal_states)
if active_acceptances.exists():
messages.error(
request,
"Cannot close this trade offer while there are active acceptances. Please reject all acceptances before closing, or finish the trades."
)
context = self.get_context_data()
return self.render_to_response(context)
else:
if trade_offer.acceptances.count() > 0:
trade_offer.is_closed = True
trade_offer.save(update_fields=["is_closed"])
messages.success(request, "Trade offer has been marked as closed.")
return HttpResponseRedirect(self.get_success_url())
else:
messages.success(request, "Trade offer has been deleted.")
return super().delete(request, *args, **kwargs)
class TradeOfferSearchView(ListView):
"""
Reworked trade offer search view using POST.
This view allows users to search active trade offers based on the cards they have and/or want.
The POST parameters (offered_cards and wanted_cards) are expected to be in the format 'card_id:quantity'.
If both types of selections are provided, the resultant queryset must satisfy both conditions.
Offers initiated by any of the user's friend codes are excluded.
When the request is AJAX (via X-Requested-With header), only the search results fragment
(_search_results.html) is rendered. On GET (initial page load), the search results queryset
is empty.
"""
model = TradeOffer
context_object_name = "search_results"
template_name = "trades/trade_offer_search.html"
paginate_by = 10
http_method_names = ["get", "post"]
def parse_selections(self, selection_list):
"""
Parse a list of selections (each formatted as 'card_id:quantity') into a list of tuples.
Defaults the quantity to 1 if missing.
"""
results = []
for item in selection_list:
parts = item.split(":")
try:
card_id = int(parts[0])
except ValueError:
continue # Skip invalid values.
qty = 1
if len(parts) > 1:
try:
qty = int(parts[1])
except ValueError:
qty = 1
results.append((card_id, qty))
return results
#@silk_profile(name="Trade Offer Search- Get Queryset")
def get_queryset(self):
# For a GET request (initial load), return an empty queryset.
if self.request.method == "GET":
return TradeOffer.objects.none()
# Parse the POST data for offered and wanted selections.
have_selections = self.parse_selections(self.request.POST.getlist("have_cards"))
want_selections = self.parse_selections(self.request.POST.getlist("want_cards"))
# If no selections are provided, return an empty queryset.
if not have_selections and not want_selections:
return TradeOffer.objects.none()
qs = TradeOffer.objects.filter(
is_closed=False,
)
if self.request.user.is_authenticated:
qs = qs.exclude(initiated_by__in=self.request.user.friend_codes.all())
# Chain filters for offered selections (i.e. the user "has" cards).
if have_selections:
for card_id, qty in have_selections:
qs = qs.filter(
trade_offer_want_cards__card_id=card_id,
trade_offer_want_cards__quantity__gte=qty,
)
# Chain filters for wanted selections (i.e. the user "wants" cards).
if want_selections:
for card_id, qty in want_selections:
qs = qs.filter(
trade_offer_have_cards__card_id=card_id,
trade_offer_have_cards__quantity__gte=qty,
)
return qs.distinct()
#@silk_profile(name="Trade Offer Search- Post")
def post(self, request, *args, **kwargs):
# For POST, simply process the search through get().
return self.get(request, *args, **kwargs)
#@silk_profile(name="Trade Offer Search- Get Context Data")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
from pkmntrade_club.cards.models import Card
# Populate available_cards to re-populate the multiselects. Exclude cards with rarity level > 5.
context["cards"] = Card.objects.filter(rarity_level__lte=5).order_by("name", "rarity_level")
if self.request.method == "POST":
context["have_cards"] = self.request.POST.getlist("have_cards")
context["want_cards"] = self.request.POST.getlist("want_cards")
else:
context["have_cards"] = []
context["want_cards"] = []
return context
#@silk_profile(name="Trade Offer Search- Render to Response")
def render_to_response(self, context, **response_kwargs):
"""
Render the AJAX fragment if the request is AJAX; otherwise, render the complete page.
"""
if self.request.headers.get("X-Requested-With") == "XMLHttpRequest":
from django.shortcuts import render
return render(self.request, "trades/_search_results.html", context)
else:
return super().render_to_response(context, **response_kwargs)
class TradeOfferDetailView(DetailView):
"""
Displays the details of a TradeOffer along with its active acceptances.
If the offer is still open and the current user is not its initiator,
an acceptance form is provided to create a new acceptance.
"""
model = TradeOffer
template_name = "trades/trade_offer_detail.html"
#@silk_profile(name="Trade Offer Detail- Get Context Data")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
trade_offer = self.get_object()
screenshot_mode = self.request.GET.get("screenshot_mode")
if screenshot_mode:
context["show_friend_code"] = trade_offer.initiated_by.user.show_friend_code_on_link_previews
context["screenshot_mode"] = screenshot_mode
# Calculate the number of cards in each category.
num_has = trade_offer.trade_offer_have_cards.count()
num_wants = trade_offer.trade_offer_want_cards.count()
num_cards = max(num_has, num_wants)
# Define the aspect ratio.
aspect_ratio = 1.91
# Calculate a base height using our previous assumptions:
# - 80px per card row (with rows computed as round(num_cards/2))
# - plus 138px for header/footer.
base_height = (round(num_cards / 2) * 56) + 138
# Calculate a base width by assuming two columns of card badges.
# Here we assume each card badge is 80px wide plus the same horizontal offset of 138px.
if (num_wants + num_has) >= 4:
base_width = (4 * 144) + 96
else:
base_width = (2 * 144) + 128
if base_height > base_width:
# The trade-offer card is taller than wide;
# compute the width from the height.
image_height = base_height
image_width = int(round(image_height * aspect_ratio)) + 1
else:
# The trade-offer card is wider than tall;
# compute the height from the width.
image_width = base_width
image_height = int(round(image_width / aspect_ratio))
# Build the meta tags with the computed dimensions.
title = f'Trade Offer from {trade_offer.initiated_by.in_game_name} ({trade_offer.initiated_by.friend_code})'
context["meta"] = Meta(
title=title,
description=f'Has: {", ".join([card.card.name for card in trade_offer.trade_offer_have_cards.all()])}\nWants: {", ".join([card.card.name for card in trade_offer.trade_offer_want_cards.all()])}',
image_object={
"url": f'http://localhost:8000{reverse_lazy("trade_offer_png", kwargs={"pk": trade_offer.pk})}',
"type": "image/png",
"width": image_width,
"height": image_height,
},
twitter_type="summary_large_image",
use_og=True,
use_twitter=True,
use_facebook=True,
use_schemaorg=True,
)
# Define terminal (closed) acceptance states based on our new system:
terminal_states = [
TradeAcceptance.AcceptanceState.THANKED_BY_INITIATOR,
TradeAcceptance.AcceptanceState.THANKED_BY_ACCEPTOR,
TradeAcceptance.AcceptanceState.THANKED_BY_BOTH,
TradeAcceptance.AcceptanceState.REJECTED_BY_INITIATOR,
TradeAcceptance.AcceptanceState.REJECTED_BY_ACCEPTOR,
]
# For example, if you want to separate active from terminal acceptances:
context["acceptances"] = trade_offer.acceptances.all()
# Option 1: Filter active acceptances using the queryset lookup.
context["active_acceptances"] = trade_offer.acceptances.exclude(state__in=terminal_states)
if self.request.user.is_authenticated:
user_friend_codes = self.request.user.friend_codes.all()
# Add context flag and deletion URL if the current user is the initiator
if trade_offer.initiated_by in user_friend_codes:
context["is_initiator"] = True
context["delete_close_url"] = reverse_lazy("trade_offer_delete", kwargs={"pk": trade_offer.pk})
else:
context["is_initiator"] = False
# Determine the user's default friend code (or fallback as needed).
default_friend_code = self.request.user.default_friend_code or user_friend_codes.first()
# If the current user is not the initiator and the offer is open, allow a new acceptance.
if trade_offer.initiated_by not in user_friend_codes and not trade_offer.is_closed:
context["acceptance_form"] = TradeAcceptanceCreateForm(
trade_offer=trade_offer,
friend_codes=user_friend_codes,
default_friend_code=default_friend_code
)
else:
context["is_initiator"] = False
context["delete_close_url"] = None
context["acceptance_form"] = None
return context
class TradeAcceptanceCreateView(LoginRequiredMixin, FriendCodeRequiredMixin, CreateView):
"""
View to create a new TradeAcceptance.
The URL should provide 'offer_pk' so that the proper TradeOffer can be identified.
"""
model = TradeAcceptance
form_class = TradeAcceptanceCreateForm
template_name = "trades/trade_acceptance_create.html"
def dispatch(self, request, *args, **kwargs):
self.trade_offer = self.get_trade_offer()
return super().dispatch(request, *args, **kwargs)
def get_trade_offer(self):
return TradeOffer.objects.get(pk=self.kwargs['offer_pk'])
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
if (self.trade_offer.initiated_by_id in
self.request.user.friend_codes.values_list("id", flat=True) or
self.trade_offer.is_closed):
raise PermissionDenied("You cannot accept this trade offer.")
kwargs['trade_offer'] = self.trade_offer
kwargs['friend_codes'] = self.request.user.friend_codes.all()
return kwargs
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["trade_offer"] = self.trade_offer
return context
def form_valid(self, form):
form.instance.trade_offer = self.trade_offer
# Set the actioning user before saving
form.instance._actioning_user = self.request.user
self.object = form.save()
return HttpResponseRedirect(self.get_success_url())
def form_invalid(self, form):
"""
If the form submission includes a 'next' URL (sent as a hidden field from the detail page),
render the trade offer detail template for a better UX. Otherwise, fall back to the default
CreateView behavior.
"""
next_url = self.request.POST.get("next")
if next_url:
friend_codes = self.request.user.friend_codes.all()
is_initiator = self.trade_offer.initiated_by in friend_codes
context = {
"object": self.trade_offer,
"trade_offer": self.trade_offer,
"acceptance_form": form,
"friend_codes": friend_codes,
"is_initiator": is_initiator,
"delete_close_url": reverse_lazy("trade_offer_delete", kwargs={"pk": self.trade_offer.pk}) if is_initiator else None,
}
# Render the detail page with the form errors
return render(self.request, "trades/trade_offer_detail.html", context)
return super().form_invalid(form)
def get_success_url(self):
return reverse_lazy("trade_acceptance_update", kwargs={"pk": self.object.pk})
class TradeAcceptanceUpdateView(LoginRequiredMixin, FriendCodeRequiredMixin, UpdateView):
"""
View to update the state of an existing TradeAcceptance.
The allowed state transitions are provided via the form.
"""
model = TradeAcceptance
form_class = TradeAcceptanceTransitionForm
template_name = "trades/trade_acceptance_update.html"
def dispatch(self, request, *args, **kwargs):
self.object = self.get_object()
friend_codes = request.user.friend_codes.values_list("id", flat=True)
if (self.object.accepted_by_id not in friend_codes and
self.object.trade_offer.initiated_by_id not in friend_codes):
raise PermissionDenied("You are not authorized to update this acceptance.")
return super().dispatch(request, *args, **kwargs)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
# Pass the current instance to the form so it can set proper allowed transitions.
kwargs["instance"] = self.object
kwargs["user"] = self.request.user
return kwargs
def form_valid(self, form):
new_state = form.cleaned_data["state"]
try:
# Try to cast new_state to the enum member
valid_state = TradeAcceptance.AcceptanceState(new_state)
except ValueError:
form.add_error("state", "Invalid state transition.")
return self.form_invalid(form)
try:
form.instance.update_state(valid_state, self.request.user)
except ValueError as e:
form.add_error("state", str(e))
return self.form_invalid(form)
return HttpResponseRedirect(self.get_success_url())
def get_success_url(self):
return reverse_lazy("trade_acceptance_update", kwargs={"pk": self.object.pk})
class TradeOfferPNGView(View):
"""
Generate a PNG screenshot of the rendered trade offer detail page using Playwright.
This view uses PostgreSQL advisory locks to ensure that only one generation process
runs at a time for a given TradeOffer. The generated PNG is then cached in the
TradeOffer model's `image` field (assumed to be an ImageField).
"""
def get_lock_key(self, trade_offer_id):
# Use the trade_offer_id as the lock key; adjust if needed.
return trade_offer_id
def get(self, request, *args, **kwargs):
from django.shortcuts import get_object_or_404
from django.http import HttpResponse
from django.core.files.base import ContentFile
trade_offer = get_object_or_404(TradeOffer, pk=kwargs['pk'])
# If the image is already generated and stored, serve it directly.
if trade_offer.image and not request.GET.get("debug"):
trade_offer.image.open()
return HttpResponse(trade_offer.image.read(), content_type="image/png")
# Acquire PostgreSQL advisory lock to prevent concurrent generation.
from django.db import connection
lock_key = self.get_lock_key(trade_offer.pk)
with connection.cursor() as cursor:
cursor.execute("SELECT pg_advisory_lock(%s)", [lock_key])
try:
# Double-check if the image was generated while waiting for the lock.
trade_offer.refresh_from_db()
if trade_offer.image and not request.GET.get("debug"):
trade_offer.image.open()
return HttpResponse(trade_offer.image.read(), content_type="image/png")
tag_context = render_trade_offer_png(
{'request': request}, trade_offer, show_friend_code=trade_offer.initiated_by.user.show_friend_code_on_link_previews
)
image_width = tag_context.get('image_width')
image_height = tag_context.get('image_height')
if not image_width or not image_height:
raise ValueError("Could not determine image dimensions from tag_context")
html = render_to_string(
"templatetags/trade_offer_png.html",
context=tag_context,
request=request
)
# if query string has "debug", render the HTML instead of the PNG
if request.GET.get("debug") == "html":
return render(request, "templatetags/trade_offer_png.html", tag_context)
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=[
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-accelerated-2d-canvas",
"--disable-gpu",
"--no-zygote",
"--disable-audio-output",
"--disable-webgl",
"--no-first-run",
]
)
context_browser = browser.new_context(viewport={"width": image_width, "height": image_height})
page = context_browser.new_page()
page.on("console", lambda msg: print(f"Console {msg.type}: {msg.text}"))
page.on("pageerror", lambda err: print(f"Page error: {err}"))
page.on("requestfailed", lambda req: print(f"Failed to load: {req.url} - {req.failure.error_text}"))
page.set_content(html, wait_until="networkidle")
element = page.wait_for_selector(".trade-offer-card-screenshot")
screenshot_bytes = element.screenshot(type="png", omit_background=True)
browser.close()
# Save the generated PNG to the TradeOffer model (requires an ImageField named `image`).
filename = f"trade_offer_{trade_offer.pk}.png"
trade_offer.image.save(filename, ContentFile(screenshot_bytes))
trade_offer.save(update_fields=["image"])
return HttpResponse(screenshot_bytes, content_type="image/png")
finally:
# Release the advisory lock.
with connection.cursor() as cursor:
cursor.execute("SELECT pg_advisory_unlock(%s)", [lock_key])
class TradeOfferCreateConfirmView(LoginRequiredMixin, View):
"""
Processes a two-step create for TradeOffer; on confirmation,
commits the offer and shows form errors if any occur.
"""
def post(self, request, *args, **kwargs):
if "confirm" in request.POST:
return self._commit_offer(request)
elif "edit" in request.POST:
return self._redirect_to_edit(request)
elif "preview" in request.POST:
return self._preview_offer(request)
else:
return self._preview_offer(request)
def _commit_offer(self, request):
"""
Commits the offer after confirmation. Any model ValidationError (for example,
due to mismatched card rarities) is caught and added to the form errors so that
it shows up in trade_offer_create.html.
"""
# Instantiate the form with POST data.
form = TradeOfferCreateForm(request.POST)
# Ensure that the 'initiated_by' queryset is limited to the user's friend codes.
form.fields["initiated_by"].queryset = request.user.friend_codes.all()
if form.is_valid():
try:
trade_offer = form.save()
except ValidationError as error:
form.add_error(None, error)
# Update the form's initial data so the template can safely reference have_cards/want_cards.
form.initial = {
"have_cards": request.POST.getlist("have_cards"),
"want_cards": request.POST.getlist("want_cards"),
"initiated_by": request.POST.get("initiated_by"),
}
# Supply additional context required by trade_offer_create.html.
from pkmntrade_club.cards.models import Card
context = {
"form": form,
"friend_codes": request.user.friend_codes.all(),
"selected_friend_code": (
request.user.default_friend_code or request.user.friend_codes.first()
),
"cards": Card.objects.all().order_by("name", "rarity_level"),
}
return render(request, "trades/trade_offer_create.html", context)
messages.success(request, "Trade offer created successfully!")
return HttpResponseRedirect(reverse_lazy("trade_offer_detail", kwargs={"pk": trade_offer.pk}))
else:
# When the form is not valid, update its initial data as well:
form.initial = {
"have_cards": request.POST.getlist("have_cards"),
"want_cards": request.POST.getlist("want_cards"),
"initiated_by": request.POST.get("initiated_by"),
}
from pkmntrade_club.cards.models import Card
context = {
"form": form,
"friend_codes": request.user.friend_codes.all(),
"selected_friend_code": (
request.user.default_friend_code or request.user.friend_codes.first()
),
"cards": Card.objects.all().order_by("name", "rarity_level"),
}
return render(request, "trades/trade_offer_create.html", context)
def _redirect_to_edit(self, request):
query_params = request.POST.copy()
query_params.pop("csrfmiddlewaretoken", None)
query_params.pop("edit", None)
query_params.pop("confirm", None)
query_params.pop("preview", None)
from django.urls import reverse
base_url = reverse("trade_offer_create")
url_with_params = f"{base_url}?{query_params.urlencode()}"
return HttpResponseRedirect(url_with_params)
def _preview_offer(self, request):
form = TradeOfferCreateForm(request.POST)
form.fields["initiated_by"].queryset = request.user.friend_codes.all()
if not form.is_valid():
# Set initial values required by the template.
form.initial = {
"have_cards": request.POST.getlist("have_cards"),
"want_cards": request.POST.getlist("want_cards"),
"initiated_by": request.POST.get("initiated_by"),
}
from pkmntrade_club.cards.models import Card
context = {
"form": form,
"friend_codes": request.user.friend_codes.all(),
"selected_friend_code": request.user.default_friend_code or request.user.friend_codes.first(),
"cards": Card.objects.all().order_by("name", "rarity_level"),
}
return render(request, "trades/trade_offer_create.html", context)
# Parse the card selections for "have" and "want" cards.
have_selections = self._parse_card_selections("have_cards")
want_selections = self._parse_card_selections("want_cards")
from pkmntrade_club.cards.models import Card
have_cards_ids = [card_id for card_id, _ in have_selections]
cards_have_qs = Card.objects.filter(pk__in=have_cards_ids)
cards_have_dict = {card.pk: card for card in cards_have_qs}
# Define a dummy wrapper for a trade offer card entry.
class DummyOfferCard:
def __init__(self, card, quantity):
self.card = card
self.quantity = quantity
self.qty_accepted = 0
have_offer_cards = []
for card_id, quantity in have_selections:
card = cards_have_dict.get(card_id)
if card:
have_offer_cards.append(DummyOfferCard(card, quantity))
want_cards_ids = [card_id for card_id, _ in want_selections]
cards_want_qs = Card.objects.filter(pk__in=want_cards_ids)
cards_want_dict = {card.pk: card for card in cards_want_qs}
want_offer_cards = []
for card_id, quantity in want_selections:
card = cards_want_dict.get(card_id)
if card:
want_offer_cards.append(DummyOfferCard(card, quantity))
# Mimic a related manager's all() method.
class DummyManager:
def __init__(self, items):
self.items = items
def all(self):
return self.items
# Create a dummy TradeOffer object with properties required by the render_trade_offer tag.
class DummyTradeOffer:
pass
dummy_trade_offer = DummyTradeOffer()
dummy_trade_offer.pk = 0 # a placeholder primary key
dummy_trade_offer.hash = "preview"
dummy_trade_offer.rarity_icon = ""
dummy_trade_offer.trade_offer_have_cards = DummyManager(have_offer_cards)
dummy_trade_offer.trade_offer_want_cards = DummyManager(want_offer_cards)
dummy_trade_offer.acceptances = DummyManager([]) # no acceptances in preview
dummy_trade_offer.initiated_by = form.cleaned_data["initiated_by"]
# Pass along the POST data so that hidden inputs can be re-generated.
context = {
"dummy_trade_offer": dummy_trade_offer,
"post_data": request.POST,
}
return render(request, "trades/trade_offer_confirm_create.html", context)
def _parse_card_selections(self, key):
"""
Parses card selections from POST data for a given key (e.g., 'have_cards' or 'want_cards').
Selections are expected in the format 'card_id:quantity', defaulting quantity to 1 if missing.
Returns a list of (card_id, quantity) tuples.
"""
selections = self.request.POST.getlist(key)
results = []
for selection in selections:
parts = selection.split(":")
try:
card_id = int(parts[0])
except (ValueError, IndexError):
continue
quantity = 1
if len(parts) > 1:
try:
quantity = int(parts[1])
except ValueError:
pass
results.append((card_id, quantity))
return results