diff --git a/app/paddle_utils.py b/app/paddle_utils.py new file mode 100644 index 00000000..49904e15 --- /dev/null +++ b/app/paddle_utils.py @@ -0,0 +1,57 @@ +""" +Verify incoming webhook from Paddle +Code inspired from https://developer.paddle.com/webhook-reference/verifying-webhooks +""" + +import base64 +import collections + +# PHPSerialize can be found at https://pypi.python.org/pypi/phpserialize +import phpserialize +from Crypto.Hash import SHA1 + +# Crypto can be found at https://pypi.org/project/pycryptodome/ +from Crypto.PublicKey import RSA +from Crypto.Signature import PKCS1_v1_5 + +from app.config import PADDLE_PUBLIC_KEY_PATH + +# Your Paddle public key. +with open(PADDLE_PUBLIC_KEY_PATH) as f: + public_key = f.read() + + +# Convert key from PEM to DER - Strip the first and last lines and newlines, and decode +public_key_encoded = public_key[26:-25].replace("\n", "") +public_key_der = base64.b64decode(public_key_encoded) + + +def verify_incoming_request(form_data: dict) -> bool: + """verify the incoming form_data""" + # copy form data + input_data = form_data.copy() + + signature = input_data["p_signature"] + + # Remove the p_signature parameter + del input_data["p_signature"] + + # Ensure all the data fields are strings + for field in input_data: + input_data[field] = str(input_data[field]) + + # Sort the data + sorted_data = collections.OrderedDict(sorted(input_data.items())) + + # and serialize the fields + serialized_data = phpserialize.dumps(sorted_data) + + # verify the data + key = RSA.importKey(public_key_der) + digest = SHA1.new() + digest.update(serialized_data) + verifier = PKCS1_v1_5.new(key) + signature = base64.b64decode(signature) + if verifier.verify(digest, signature): + return True + return False diff --git a/tests/test_paddle_utils.py b/tests/test_paddle_utils.py new file mode 100644 index 00000000..02c8461b --- /dev/null +++ b/tests/test_paddle_utils.py @@ -0,0 +1,45 @@ +from app.paddle_utils import verify_incoming_request + + +def test_verify_incoming_request(): + # the request comes from Paddle simulation + request_data = { + "alert_id": "1647146853", + "alert_name": "payment_succeeded", + "balance_currency": "EUR", + "balance_earnings": "966.81", + "balance_fee": "16.03", + "balance_gross": "107.37", + "balance_tax": "670.85", + "checkout_id": "8-a367127c071e8a2-cba0a50da3", + "country": "AU", + "coupon": "Coupon 7", + "currency": "USD", + "customer_name": "customer_name", + "earnings": "820.91", + "email": "awyman@example.org", + "event_time": "2019-12-14 18:43:09", + "fee": "0.26", + "ip": "65.220.94.158", + "marketing_consent": "1", + "order_id": "8", + "passthrough": "Example String", + "payment_method": "paypal", + "payment_tax": "0.18", + "product_id": "3", + "product_name": "Example String", + "quantity": "29", + "receipt_url": "https://my.paddle.com/receipt/4/5854e29100fd226-440fa7ba7a", + "sale_gross": "568.82", + "used_price_override": "true", + "p_signature": "CQrBWKnAuhBOWdgu6+upbgpLo38c2oQJVgNHLTNsQoaUHtJgHUXzfUfQdcnD9q3EWZuQtyFXXPkygxx/fMbcu+UTnfxkjyecoHio8w4T858jU4VOy1RPqYy6fqazG1vlngiuYqEdgo8OHT/6oIJAf+NWm1v1iwbpr62rDygzJWZrqTzVSKkESfW8/4goxlN2BWr6eaN/4nKQ4gaHq5ee3/7vMmkrLAQG509x9SK3H0bYvh3pvbWMUhYNz8j+7GZRlXcSCpMKw1nkO/jK4IXKW0rtSwgyVjJhpX+/rt2byaCmWEvP0LtGhrug9xAqMYJ3tDCJmwSk2cXG8rPE7oeBwEEElZrQJdbV+i6Tw5rw9LaqEGrjhSkOapfpINdct5UpKXybIyiRZZ111yhJL081T1rtBqb8L+wsPnHG8GzI1Fg5je98j5aXGQU9hcw5nQN779IJQWNN+GbDQZ+Eleu5c6ZYauxpKzE8s/Vs2a4/70KB6WBK6NKxNSIIoOTumKqnfEiPN0pxZp5MMi2dRW7wu7VqvcLbIEYtCkOLnjxVyko32B6AMIgn8CuHvQp9ScPdNdU6B8dBXhdVfV75iYSwx+ythun5d3f357IecaZep27QQmKR/b7/pv4iMOiHKmFQRz9EKwqQm/3Xg2WS4GA4t1X0nslXMuEeRnX6xTaxbvk=", + } + assert verify_incoming_request(request_data) + + # add a new field in request_data -> verify should fail + request_data["new_field"] = "new_field" + assert not verify_incoming_request(request_data) + + # modify existing field -> verify should fail + request_data["sale_gross"] = "1.23" + assert not verify_incoming_request(request_data)