有限状态机(Finite State Machine,FSM)和消息队列(Message Queue,MQ)是两个强大的工具,结合使用可以有效地处理复杂的业务流程和异步任务。在系统设计中,FSM可以管理复杂的状态转换逻辑,而MQ可以解耦组件并提供可靠的消息传递机制。下面是一个详细的示例,说明如何将有限状态机和消息队列配合使用来实现一个复杂的业务流程。
场景描述假设我们需要实现一个订单处理系统,其中订单可以有多个状态(如“创建”、“支付中”、“已支付”、“发货中”、“已发货”、“已完成”)。我们希望使用FSM来管理订单状态,并使用MQ来处理异步任务,如支付处理、库存检查和发货等。
设计步骤1. 定义状态和事件首先,我们定义订单的各个状态和可能的事件:
状态: 1. Created(创建) 2. PendingPayment(支付中) 3. Paid(已支付) 4. Shipping(发货中) 5. Shipped(已发货) 6. Completed(已完成) 事件: 1. MakePayment(进行支付) 2. PaymentSuccess(支付成功) 3. StartShipping(开始发货) 4. CompleteShipping(发货完成) 5. OrderComplete(订单完成)2. 定义有限状态机
使用FSM管理订单状态:
from transitions import Machine
class OrderFSM:
states = ['Created', 'PendingPayment', 'Paid', 'Shipping', 'Shipped', 'Completed']
def __init__(self, order_id):
self.order_id = order_id
self.machine = Machine(model=self, states=OrderFSM.states, initial='Created')
self.machine.add_transition(trigger='make_payment', source='Created', dest='PendingPayment')
self.machine.add_transition(trigger='payment_success', source='PendingPayment', dest='Paid')
self.machine.add_transition(trigger='start_shipping', source='Paid', dest='Shipping')
self.machine.add_transition(trigger='complete_shipping', source='Shipping', dest='Shipped')
self.machine.add_transition(trigger='order_complete', source='Shipped', dest='Completed')
def on_enter_PendingPayment(self):
# Send a message to the payment service to process the payment
send_message_to_queue('payment_queue', {'order_id': self.order_id})
def on_enter_Shipping(self):
# Send a message to the shipping service to start the shipping process
send_message_to_queue('shipping_queue', {'order_id': self.order_id})
def on_enter_Completed(self):
print(f"Order {self.order_id} has been completed.")
def send_message_to_queue(queue_name, message):
# This function sends a message to the specified message queue
pass 3. 使用消息队列处理异步任务消息队列用于处理支付、库存检查和发货等异步任务。这里假设我们使用RabbitMQ作为消息队列。
import pika
import json
def send_message_to_queue(queue_name, message):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
connection.close()
def on_payment_message(ch, method, properties, body):
message = json.loads(body)
order_id = message['order_id']
# Process payment
process_payment(order_id)
# After processing payment, transition order state
order_fsm = get_order_fsm(order_id)
order_fsm.payment_success()
ch.basic_ack(delivery_tag=method.delivery_tag)
def on_shipping_message(ch, method, properties, body):
message = json.loads(body)
order_id = message['order_id']
# Process shipping
process_shipping(order_id)
# After processing shipping, transition order state
order_fsm = get_order_fsm(order_id)
order_fsm.complete_shipping()
ch.basic_ack(delivery_tag=method.delivery_tag)
def get_order_fsm(order_id):
# Retrieve or create the order FSM instance for the given order_id
return OrderFSM(order_id)
def process_payment(order_id):
# Simulate payment processing
print(f"Processing payment for order {order_id}")
def process_shipping(order_id):
# Simulate shipping processing
print(f"Processing shipping for order {order_id}")
# Set up RabbitMQ consumers
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='payment_queue', durable=True)
channel.queue_declare(queue='shipping_queue', durable=True)
channel.basic_consume(queue='payment_queue', on_message_callback=on_payment_message)
channel.basic_consume(queue='shipping_queue', on_message_callback=on_shipping_message)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming() 4. 示例执行流程创建一个订单,并进入 Created 状态。调用 order_fsm.make_payment(),状态变为 PendingPayment,并发送消息到 payment_queue。消费者接收到 payment_queue 中的消息,处理支付,并调用 order_fsm.payment_success(),状态变为 Paid。调用 order_fsm.start_shipping(),状态变为 Shipping,并发送消息到 shipping_queue。消费者接收到 shipping_queue 中的消息,处理发货,并调用 order_fsm.complete_shipping(),状态变为 Shipped。最后,调用 order_fsm.order_complete(),状态变为 Completed。通过上述流程,FSM和MQ有效地配合在一起,实现了复杂业务流程的管理和异步任务的处理。
网友回复


