131 lines
4.7 KiB
Python
131 lines
4.7 KiB
Python
import datetime
|
|
import io
|
|
import json
|
|
import uuid
|
|
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
|
|
class Transaction():
|
|
def __init__(self, transactionId=uuid.uuid4(),
|
|
timestamp=datetime.datetime.now(datetime.timezone.utc),
|
|
cardId=None, sender=None, receiver=None, authorPrivateKey=None, signature=None,
|
|
data=None):
|
|
self.transactionId = transactionId
|
|
self.timestamp = timestamp
|
|
self.cardId = cardId
|
|
self.sender = sender
|
|
self.receiver = receiver
|
|
self.signature = signature
|
|
if data:
|
|
self.load_from_data(data)
|
|
if authorPrivateKey:
|
|
self.sign(authorPrivateKey)
|
|
|
|
def validate(self):
|
|
# TODO: validate transactionId
|
|
# TODO: validate timestamp
|
|
# TODO: validate cardId
|
|
# TODO: validate sender
|
|
# TODO: validate receiver
|
|
# TODO: validate signature
|
|
try:
|
|
self.sender.verify(
|
|
self.signature,
|
|
bytearray(
|
|
json.dumps({
|
|
"transactionId": str(self.transactionId),
|
|
"timestamp": str(self.timestamp),
|
|
"cardId": str(self.cardId),
|
|
"receiver": self.receiver.public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
).decode('utf-8')
|
|
}).encode('utf-8')
|
|
),
|
|
padding.PSS(
|
|
mgf=padding.MGF1(hashes.SHA256()),
|
|
salt_length=padding.PSS.MAX_LENGTH
|
|
),
|
|
hashes.SHA256()
|
|
)
|
|
except Exception as e:
|
|
raise self.InvalidSignature("Invalid signature.")
|
|
|
|
def sign(self, authorPrivateKey):
|
|
self.signature = authorPrivateKey.sign(
|
|
bytearray(
|
|
json.dumps({
|
|
"transactionId": str(self.transactionId),
|
|
"timestamp": str(self.timestamp),
|
|
"cardId": str(self.cardId),
|
|
"receiver": self.receiver.public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
).decode('utf-8')
|
|
}).encode('utf-8')
|
|
),
|
|
padding.PSS(
|
|
mgf=padding.MGF1(hashes.SHA256()),
|
|
salt_length=padding.PSS.MAX_LENGTH
|
|
),
|
|
hashes.SHA256()
|
|
)
|
|
|
|
def load_from_data(self, data):
|
|
self.transactionId = uuid.UUID(data['transactionId'])
|
|
self.timestamp = datetime.datetime.strptime(
|
|
data['timestamp'],
|
|
"%Y-%m-%d %H:%M:%S.%f%z"
|
|
)
|
|
self.cardId = uuid.UUID(data['cardId'])
|
|
# TODO: load rsa keys
|
|
self.sender = data['sender']
|
|
self.receiver = data['receiver']
|
|
self.signature = data['signature']
|
|
|
|
def as_dict(self):
|
|
return {
|
|
"transactionId": str(self.transactionId),
|
|
"timestamp": str(self.timestamp),
|
|
"cardId": str(self.cardId),
|
|
"receiver": self.receiver.public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
).decode('utf-8'),
|
|
"sender": self.sender.public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
).decode('utf-8'),
|
|
"signature": self.signature.hex()
|
|
}
|
|
|
|
def __str__(self):
|
|
return json.dumps(self.as_dict())
|
|
|
|
class Unauthorized(Exception):
|
|
def __init__(self, message, statusCode=403):
|
|
super().__init__(message)
|
|
self.statusCode = statusCode
|
|
|
|
class Invalid(Exception):
|
|
def __init__(self, message, statusCode=400):
|
|
super().__init__(message)
|
|
self.statusCode = statusCode
|
|
|
|
class AlreadyFulfilled(Exception):
|
|
def __init__(self, message, statusCode=400):
|
|
super().__init__(message)
|
|
self.statusCode = statusCode
|
|
|
|
class Abandoned(Exception):
|
|
def __init__(self, message, statusCode=500):
|
|
super().__init__(message)
|
|
self.statusCode = statusCode
|
|
|
|
class InvalidSignature(Exception):
|
|
def __init__(self, message, statusCode=400):
|
|
super().__init__(message)
|
|
self.statusCode = statusCode
|