秒杀系统—2.第一版初步实现的技术文档
大纲
1.在秒杀运营服务中开发配置秒杀活动接口
2.创建独立的通用工程封装Jedis客户端管理组件
3.对秒杀场次和秒杀商品进行DB + 缓存双写
4.新增秒杀商品时发送消息到RocketMQ
5.消费秒杀商品新增消息时渲染秒杀页面
6.对秒杀商品的库存进行初始化
7.基于Lua开发OpenResty中内嵌的限流Lua脚本
8.实现秒杀抢购流程的核心代码
9.实现秒杀系统高可用架构的伪代码
1.在秒杀运营服务中开发配置秒杀活动接口
seckill-operation模块主要有两个接口:添加秒杀场次 + 添加秒杀商品。
//简化版的秒杀运营服务的接口,所有接口都是GET方式
@RestController
@RequestMapping("/seckill/operation")
public class SeckillOperationController {//秒杀场次Service组件@Autowiredprivate SeckillSessionService seckillSessionService;//秒杀商品Service组件@Autowiredprivate SeckillProductService seckillProductService;//增加秒杀场次的接口@GetMapping("/session/add")public String addSeckillSession(SeckillSession seckillSession) {seckillSessionService.add(seckillSession);return "success";}//增加秒杀场次下商品的接口@GetMapping("/product/add")public String addSeckillProduct(SeckillProduct seckillProduct) {seckillProductService.add(seckillProduct);return "success";}
}//秒杀场次
@Data
public class SeckillSession {//秒杀场次idprivate Long id;//秒杀场次日期private String sessionDate;//秒杀场次时间private String sessionTime;
}//秒杀商品
@Data
public class SeckillProduct {//秒杀商品idprivate Long id;//秒杀场次idprivate Long sessionId;//商品idprivate Long productId;//秒杀价格private Double seckillPrice;//秒杀库存数量private Long seckillStock;
}//秒杀场次Service组件
@Service
public class SeckillSessionServiceImpl implements SeckillSessionService {//秒杀场次DAO组件@Autowiredprivate SeckillSessionDAO seckillSessionDAO;//增加秒杀场次public void add(SeckillSession seckillSession) {seckillSessionDAO.add(seckillSession);}
}//秒杀商品Service组件
@Service
public class SeckillProductServiceImpl implements SeckillProductService {//秒杀商品DAO组件@Autowiredprivate SeckillProductDAO seckillProductDAO;//增加秒杀商品public void add(SeckillProduct seckillProduct) {seckillProductDAO.add(seckillProduct);}
}//秒杀场次DAO组件
@Repository
public class SeckillSessionDAOImpl implements SeckillSessionDAO {//秒杀场次Mapper组件@Autowiredprivate SeckillSessionMapper seckillSessionMapper;//增加秒杀场次public void add(SeckillSession seckillSession) {seckillSessionMapper.insert(seckillSession);}
}//秒杀商品DAO组件
@Repository
public class SeckillProductDAOImpl implements SeckillProductDAO {//秒杀商品Mapper组件@Autowiredprivate SeckillProductMapper seckillProductMapper;//增加秒杀商品public void add(SeckillProduct seckillProduct) {seckillProductMapper.insert(seckillProduct);}
}//秒杀场次Mapper组件
@Mapper
public interface SeckillSessionMapper {//插入秒杀场次@Insert("INSERT INTO seckill_session(session_date,session_time) " + "VALUES(#{sessionDate},#{sessionTime})")@Options(useGeneratedKeys = true, keyColumn = "id", keyProperty = "id")void insert(SeckillSession seckillSession);
}//秒杀商品Mapper组件
@Mapper
public interface SeckillProductMapper {//插入秒杀商品@Insert("INSERT INTO seckill_product(session_id,product_id,seckill_price,seckill_stock) " + "VALUES(#{sessionId},#{productId},#{seckillPrice},#{seckillStock})")@Options(useGeneratedKeys = true, keyColumn = "id", keyProperty = "id")void insert(SeckillProduct seckillProduct);
}
2.创建独立的通用工程封装Jedis客户端管理组件
为了避免Redis采用主从复制架构时主节点宕机可能导致的超卖问题,假设部署的Redis节点都是Master,不做主从复制,多主支持库存分片。
seckill-common模块封装了一个管理Jedis客户端的组件:
//管理单个Jedis实例的组件
public class JedisManager {//一个Redis实例在这里就是一个Jedis实例private ConcurrentHashMap<String, Jedis> jedisMap = new ConcurrentHashMap<String, Jedis>();//管理组件自己本身是单例private JedisManager() {}static class Singleton {static JedisManager instance = new JedisManager();}public static JedisManager getInstance() {return Singleton.instance;}//获取Jedis实例,同时进行缓存public Jedis getJedis(String host, Integer port) {String cacheKey = host + port;if (jedisMap.get(cacheKey) == null) {synchronized(this) {if (jedisMap.get(cacheKey) == null) {Jedis jedis = new Jedis(host, port);jedisMap.put(cacheKey, jedis);}}}return jedisMap.get(cacheKey);}//获取默认的jedis实例public Jedis getJedis() {return getJedis("127.0.0.1", 6379);}
}
3.对秒杀场次和秒杀商品进行DB + 缓存双写
先写MySQL,再写Redis。
//秒杀场次Service组件
@Service
public class SeckillSessionServiceImpl implements SeckillSessionService {//秒杀场次DAO组件@Autowiredprivate SeckillSessionDAO seckillSessionDAO;//增加秒杀场次public void add(SeckillSession seckillSession) {//先写MySQLseckillSessionDAO.add(seckillSession);//再写RedisJedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();jedis.lpush("seckill::sessions::" + seckillSession.getSessionDate(), JSONObject.toJSONString(seckillSession));}
}//秒杀商品Service组件
@Service
public class SeckillProductServiceImpl implements SeckillProductService {//秒杀商品DAO组件@Autowiredprivate SeckillProductDAO seckillProductDAO;//增加秒杀商品public void add(SeckillProduct seckillProduct) {//先写MySQLseckillProductDAO.add(seckillProduct);//再写RedisJedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();jedis.lpush("seckill::products::" + seckillProduct.getSessionId(), JSONObject.toJSONString(seckillProduct));}
}
4.新增秒杀商品时发送消息到RocketMQ
需要在seckill-common模块下封装一个RocketMQ生产者和消费者的单例。其实秒杀系统中,基本都是靠MQ来进行解耦,很少会直接发起调用请求。
//封装一个RocketMQ生产者单例类
public class RocketMQProducer {private DefaultMQProducer producer;private static String producerGroup;private RocketMQProducer(String producerGroup) {try {this.producer = new DefaultMQProducer(producerGroup);this.producer.setNamesrvAddr("localhost:9876");this.producer.setSendMsgTimeout(60 * 1000);this.producer.start();} catch(MQClientException e) {System.err.println("初始化RocketMQ生产者失败:" + e);}}private static class Singleton {static RocketMQProducer instance = new RocketMQProducer(producerGroup);}//设置生产者分组名称public static void setProducerGroup(String producerGroup) {RocketMQProducer.producerGroup = producerGroup;}//获取单例public static RocketMQProducer getInstance() {return Singleton.instance;}//获取MQ生产者public DefaultMQProducer getProducer() {return producer;}
}//封装一个RocketMQ消费者单例类
public class RocketMQConsumer {private static String consumerGroup;private static String topic;private static MessageListenerConcurrently listener;private RocketMQConsumer(String consumerGroup, String topic, MessageListenerConcurrently listener) {try {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr("localhost:9876");consumer.subscribe(topic, "*");consumer.registerMessageListener(listener);consumer.start();System.out.println("RocketMQ消费者启动成功......");} catch(MQClientException e) {System.err.println("初始化RocketMQ消费者失败......" + e);}}private static class Singleton {static RocketMQConsumer instance = new RocketMQConsumer(consumerGroup, topic, listener);}public RocketMQConsumer getInstance() {return Singleton.instance;}public static void setConsumerGroup(String consumerGroup) {RocketMQConsumer.consumerGroup = consumerGroup;}public static void setTopic(String topic) {RocketMQConsumer.topic = topic;}public static void setListener(MessageListenerConcurrently listener) {RocketMQConsumer.listener = listener;}public static RocketMQConsumer init() {return Singleton.instance;}
}
然后在seckill-operation模块下新增秒杀商品时发送消息到RocketMQ,这个新增秒杀商品的消息会被seckill-page模块消费来渲染秒杀商品页面。
//秒杀运营服务的接口
@RestController
@RequestMapping("/seckill/operation")
public class SeckillOperationController {//秒杀场次Service组件@Autowiredprivate SeckillSessionService seckillSessionService;//秒杀商品Service组件@Autowiredprivate SeckillProductService seckillProductService;//增加秒杀场次的接口@GetMapping("/session/add")public String addSeckillSession(SeckillSession seckillSession) {seckillSessionService.add(seckillSession);return "success";}//增加秒杀场次下商品的接口@GetMapping("/product/add")public String addSeckillProduct(SeckillProduct seckillProduct) {seckillProductService.add(seckillProduct);DefaultMQProducer producer = RocketMQProducer.getInstance().getProducer();try {Message message = new Message("seckill_product_added_topic", null, JSONObject.toJSONString(seckillProduct).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(message);System.out.printf("%s%n", sendResult);System.out.println("增加秒杀商品时,发送消息到MQ成功");} catch(Exception e) {System.err.println("增加秒杀商品时,发送消息到MQ失败" + e);return "failure";}return "success";}
}
5.消费秒杀商品新增消息时渲染秒杀页面
在seckill-page模块会消费新增秒杀商品消息,然后使用freemarker渲染秒杀商品页面。
@Component
public class BootListener implements CommandLineRunner {public void run(String... strings) throws Exception {RocketMQConsumer.setConsumerGroup("seckill-page-consumer-group");RocketMQConsumer.setTopic("seckill-product-added-topic");RocketMQConsumer.setListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {String seckillProductJSON = new String(messageExt.getBody());JSONObject seckillProductJSONObject = JSONObject.parseObject(seckillProductJSON);Long productId = seckillProductJSONObject.getLong("productId");Double seckillPrice = seckillProductJSONObject.getDouble("seckillPrice");Long seckillStock = seckillProductJSONObject.getLong("seckillStock");FreemarkerHelper viewEngine = new FreemarkerHelper();Map<String, Object> paras = new HashMap<String, Object>();paras.put("productId", productId);paras.put("seckillPrice", seckillPrice);paras.put("seckillStock", seckillStock);String html = viewEngine.parseTemplate("autolist.ftl", paras);System.out.println("将渲染完毕的秒杀商品html页面写入磁盘文件......");System.out.println(html);System.out.println("将磁盘上的html文件使用scp命令传送到nginx服务器上去......");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});RocketMQConsumer.init();}
}//freemarker辅助组件
public class FreemarkerHelper {private static Configuration _tplConfig = new Configuration();static {try {_tplConfig.setDirectoryForTemplateLoading(new File("xxx/xxx/xxx"));} catch (IOException e) {e.printStackTrace();}}//解析freemarker模板public String parseTemplate(String tplName, String encoding, Map<String, Object> paras) {try {StringWriter swriter = new StringWriter();Template mytpl = null;mytpl = _tplConfig.getTemplate(tplName, encoding);mytpl.process(paras, swriter);return swriter.toString();} catch (Exception e) {e.printStackTrace();return e.toString();}}public String parseTemplate(String tplName, Map<String, Object> paras) {return this.parseTemplate(tplName, "utf-8", paras);}
}
6.对秒杀商品的库存进行初始化
在seckill-inventory模块中消费新增秒杀商品消息来冻结库存,然后把秒杀商品的库存进行分片,放在Redis各节点上去。
@Component
public class BootListener implements CommandLineRunner {public void run(String... strings) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("seckill-inventory-consumer-group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("seckill_product_added_topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : messageExts) {System.out.println(new String(messageExt.getBody()));//获取秒杀场次里增加的是哪个商品String seckillProductJSON = new String(messageExt.getBody());JSONObject seckillProductJSONObject = JSONObject.parseObject(seckillProductJSON);//调用库存中心提供的接口,冻结商品用于秒杀活动的库存Long productId = seckillProductJSONObject.getLong("productId");Double seckillPrice = seckillProductJSONObject.getDouble("seckillPrice");Long seckillStock = seckillProductJSONObject.getLong("seckillStock");System.out.println("调用库存中心提供的接口,冻结商品用于秒杀活动的库存");//库存中心:可售库存、锁定库存、已售库存、冻结库存//把秒杀商品的库存进行分片,放在Redis集群各个节点上RedisCluster redisCluster = RedisCluster.getInstance();redisCluster.initSeckillProductStock(productId, seckillStock);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("消费者启动......");}
}
在seckill-common模块中实现库存分片的组件:
//基于Redis集群进行数据分片的工具类
public class RedisCluster {//Redis集群对应的Jedis列表private List<Jedis> cluster = new ArrayList<Jedis>();//私有构造函数private RedisCluster() {//初始化Redis集群cluster.add(new Jedis("127.0.0.1", 6479));cluster.add(new Jedis("127.0.0.1", 6579));cluster.add(new Jedis("127.0.0.1", 6679));}//单例private static class Singleton {static RedisCluster instance = new RedisCluster();}//获取单例public static RedisCluster getInstance() {return Singleton.instance;}//初始化秒杀商品库存public void initSeckillProductStock(Long productId, Long seckillStock) {//计算每个Redis节点上的库存分片数量int clusterSize = cluster.size();Long seckillStockPerNode = seckillStock / clusterSize;Long remainSeckillStock = seckillStock - seckillStockPerNode * clusterSize;Long seckillStockLastNode = seckillStockPerNode + remainSeckillStock;System.out.println("每个Redis节点(除最后一个节点外)的库存数量为:" + seckillStockPerNode);System.out.println("最后一个Redis节点的库存数量为:" + seckillStockLastNode);System.out.println("Redis集群的总库存数量为:" + (seckillStockPerNode * (clusterSize - 1) + seckillStockLastNode));//对除最后一个redis节点之外的其他节点,进行库存分片数据初始化for (int i = 0; i < cluster.size() - 1; i++) {Jedis jedis = cluster.get(i);JSONObject jsonObject = new JSONObject();jsonObject.put("saling_stock", seckillStockPerNode);jsonObject.put("locked_stock", 0);jsonObject.put("saled_stock", 0);jedis.set("seckill::product::" + productId + "::stock", jsonObject.toJSONString());}//对Redis集群最后一个节点,进行库存分片数据初始化Jedis jedis = cluster.get(cluster.size() - 1);JSONObject jsonObject = new JSONObject();jsonObject.put("saling_stock", seckillStockLastNode);jsonObject.put("locked_stock", 0);jsonObject.put("saled_stock", 0);jedis.set("seckill::product::" + productId + "::stock", jsonObject.toJSONString());}
}
7.基于Lua开发OpenResty中内嵌的限流Lua脚本
-- 限流可以分为:全局限流 + 业务限流-- 全局限流,这里假设写死为10000,每秒最多可以放10000流量进入秒杀系统
-- 当然全局限流数也是可以是动态可配置的,动态配置时可以配置在Redis中
globalLimiting = 10000-- 业务限流,需要先获取Redis里当前的秒杀场次,再获取场次里每个商品的限购数量
-- 比如对该秒杀场次下的商品的最大抢购请求数 = 其限购数量 * 1.1
currentSessionId = 101
currentSessionProductLimiting = {}
currentSessionProductLimiting[518] = 1000
currentSessionProductLimiting[629] = 10000
currentSessionProductLimiting[745] = 200-- 此时在OpenResty里,过来了一个请求
-- 首先进行全局限流,每秒最多可以放行1w个请求
-- 下面定义一个变量currentRequests,存放当前这一秒放放行的请求数量
currentTime = nil
currentRequests = 0
currentProductRequests = {}-- 获取当前时间戳
timestamp = os.date("%Y-%m-%d %H:%M:%S")
if (currentTime == nil)
thencurrentTime = timestamp
end-- 判断currentTime是否属于当前这一秒,如果是则对当前这一秒的数量进行累加
if (timestamp == currentTime)
thenif (currentRequests <= globalLimiting)then-- OpenResty支持提取HTTP请求的请求参数、请求信息等-- 下面假设提取本次抢购请求的秒杀商品id为518local productId = 518local productRequests = currentProductRequests[productId]if (productRequests == nil or productRequests == 0)then-- 放行HTTP请求currentProductRequests[productId] = 1currentRequests = currentRequests + 1elselocal productLimiting = currentSessionProductLimiting[productId]-- 商品的最大抢购请求数 = 其限购数量 * 1.1if (productRequests <= productLimiting * 1.1)then-- 放行HTTP请求currentProductRequests[productId] = productRequests + 1currentRequests = currentRequests + 1else-- 秒杀商品的放行请求数已超过其限购数量的1.1倍-- 此时进行业务限流,返回响应给客户端,通知用户抢购失败endendelse-- 这一秒内的请求数量超过了10000,进行全局限流-- 利用OpenResty返回一个预定义的响应给客户端,通知用户抢购失败end
else-- 新的一秒重置currentTime和currentRequests,并放行当前请求currentTime = timestampcurrentRequests = 1
end
8.实现秒杀抢购流程的核心代码
(1)使用Hash数据结构来重构秒杀库存数据
(2)实现处理秒杀抢购请求的HTTP接口
(3)库存分片组件RedisCluster实现秒杀抢购逻辑
(4)消费秒杀抢购成功的消息生成抢购订单
(5)消费订单创建成功的消息更新秒杀商品库存
(1)使用Hash数据结构来重构秒杀库存数据
修改seckill-common模块中的RedisCluster:
//基于Redis集群进行数据分片的工具类
public class RedisCluster {//Redis集群对应的Jedis列表private List<Jedis> cluster = new ArrayList<Jedis>();//私有构造函数private RedisCluster() {//初始化Redis集群cluster.add(new Jedis("127.0.0.1", 6479));cluster.add(new Jedis("127.0.0.1", 6579));cluster.add(new Jedis("127.0.0.1", 6679));}//单例private static class Singleton {static RedisCluster instance = new RedisCluster();}//获取单例public static RedisCluster getInstance() {return Singleton.instance;}//初始化秒杀商品库存public void initSeckillProductStock(Long productId, Long seckillStock) {//计算每个Redis节点上的库存分片数量int clusterSize = cluster.size();Long seckillStockPerNode = seckillStock / clusterSize;Long remainSeckillStock = seckillStock - seckillStockPerNode * clusterSize;Long seckillStockLastNode = seckillStockPerNode + remainSeckillStock;System.out.println("每个Redis节点(除最后一个节点外)的库存数量为:" + seckillStockPerNode);System.out.println("最后一个Redis节点的库存数量为:" + seckillStockLastNode);System.out.println("Redis集群的总库存数量为:" + (seckillStockPerNode * (clusterSize - 1) + seckillStockLastNode));//对除最后一个redis节点之外的其他节点,进行库存分片数据初始化for (int i = 0; i < cluster.size() - 1; i++) {Jedis jedis = cluster.get(i);Map<String, String> dataMap = new HashMap<String, String>();dataMap.put("salableStock", String.valueOf(seckillStockPerNode));dataMap.put("lockedStock", "0");dataMap.put("soldStock", "0");jedis.hset("seckill::product::" + productId + "::stock", dataMap);}//对Redis集群最后一个节点,进行库存分片数据初始化Jedis jedis = cluster.get(cluster.size() - 1);Map<String, String> dataMap = new HashMap<String, String>();dataMap.put("salableStock", String.valueOf(seckillStockLastNode));dataMap.put("lockedStock", "0");dataMap.put("soldStock", "0");jedis.hset("seckill::product::" + productId + "::stock", dataMap);}...
}
(2)实现处理秒杀抢购请求的HTTP接口
在seckill-flash-sale模块中添加:
//秒杀抢购请求处理接口
@RestController
@RequestMapping("/seckill/flash/sale")
public class FlashSaleController {//用户对商品进行抢购,默认限定每个商品最多只能抢购一件@GetMapping("/")public String flashSale(Long userId, Long productId) {//秒杀抢购代码的核心是Lua脚本,需要实现当库存分片为0时,自动进行库存分片节点迁移RedisCluster redisCluster = RedisCluster.getInstance();Boolean flashSaleResult = redisCluster.flashSale(userId, productId);//如果秒杀抢购成功了,则发送消息到MQ进行异步下单if (flashSaleResult) {DefaultMQProducer producer = RocketMQProducer.getInstance().getProducer();try {JSONObject flashSaleSuccessInform = new JSONObject();flashSaleSuccessInform.put("userId", userId);flashSaleSuccessInform.put("productId", productId);Message message = new Message("flash_sale_success_inform", null, flashSaleSuccessInform.toJSONString().getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(message);System.out.printf("%s%n", sendResult);System.out.println("发送秒杀抢购成功的消息到MQ成功......");} catch(Exception e) {System.err.println("发送秒杀抢购成功的消息到MQ失败:" + e);return "秒杀抢购成功,但是推送消息到MQ失败";}return "秒杀抢购成功";} else {return "秒杀抢购失败";}}
}
(3)库存分片组件RedisCluster实现秒杀抢购逻辑
修改seckill-common模块中的RedisCluster:
//基于Redis集群进行数据分片的工具类
public class RedisCluster {//Redis集群对应的Jedis列表private List<Jedis> cluster = new ArrayList<Jedis>();//私有构造函数private RedisCluster() {//初始化redis集群cluster.add(new Jedis("127.0.0.1", 6479));cluster.add(new Jedis("127.0.0.1", 6579));cluster.add(new Jedis("127.0.0.1", 6679));}//单例private static class Singleton {static RedisCluster instance = new RedisCluster();}//获取单例public static RedisCluster getInstance() {return Singleton.instance;}//初始化秒杀商品库存public void initSeckillProductStock(Long productId, Long seckillStock) {//计算每个Redis节点上的库存分片数量int clusterSize = cluster.size();Long seckillStockPerNode = seckillStock / clusterSize;Long remainSeckillStock = seckillStock - seckillStockPerNode * clusterSize;Long seckillStockLastNode = seckillStockPerNode + remainSeckillStock;System.out.println("每个Redis节点(除最后一个节点外)的库存数量为:" + seckillStockPerNode);System.out.println("最后一个Redis节点的库存数量为:" + seckillStockLastNode);System.out.println("Redis集群的总库存数量为:" + (seckillStockPerNode * (clusterSize - 1) + seckillStockLastNode));//对除最后一个Redis节点之外的其他节点,进行库存分片数据初始化for (int i = 0; i < cluster.size() - 1; i++) {Jedis jedis = cluster.get(i);Map<String, String> dataMap = new HashMap<String, String>();dataMap.put("salableStock", String.valueOf(seckillStockPerNode));dataMap.put("lockedStock", "0");dataMap.put("soldStock", "0");jedis.hset("seckill::product::" + productId + "::stock", dataMap);}//对Redis集群最后一个节点,进行库存分片数据初始化Jedis jedis = cluster.get(cluster.size() - 1);Map<String, String> dataMap = new HashMap<String, String>();dataMap.put("salableStock", String.valueOf(seckillStockLastNode));dataMap.put("lockedStock", "0");dataMap.put("soldStock", "0");jedis.hset("seckill::product::" + productId + "::stock", dataMap);}//基于Redis集群进行秒杀抢购public Boolean flashSale(Long userId, Long productId) {//随机选择一个Redis的节点int redisNodeCount = cluster.size();int chosenRedisNodeIndex = new Random().nextInt(redisNodeCount);Jedis chosenRedisNode = cluster.get(chosenRedisNodeIndex);//向Redis节点提交一个Lua脚本进行抢购String flashSaleLuaScript = ""+ "local productKey = 'seckill::product::" + productId + "::stock';"+ "local salableStock = redis.call('hget', productKey, 'salableStock');"+ "local lockedStock = redis.call('hget', productKey, 'lockedStock');"+ "if (salableStock > 0) "+ "then "+ " redis.call('hset', productKey, 'salableStock', salableStock - 1);"+ " redis.call('hset', productKey, 'lockedStock', lockedStock + 1);"+ " return 'success';"+ "else "+ " return 'fail';"+ "end;";//通过Jedis的eval()方法执行Lua脚本String flashSaleResult = (String) chosenRedisNode.eval(flashSaleLuaScript);//如果秒杀抢购成功了if ("success".equals(flashSaleResult)) {JedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();//记录用户秒杀成功的商品库存属于哪个分片jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(chosenRedisNodeIndex));return true;}//如果第一次秒杀抢购失败,则进行库存分片迁移的操作//当然可以继续优化为,库存分片为0的节点后续不会再被选中else {Boolean flashSaleSuccess = false;for (int i = 0; i < cluster.size(); i++) {if (i != chosenRedisNodeIndex) {Jedis redisNode = cluster.get(i);flashSaleResult = (String) redisNode.eval(flashSaleLuaScript);if ("success".equals(flashSaleResult)) {JedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(i));flashSaleSuccess = true;break;}}}return flashSaleSuccess;}}...
}
(4)消费秒杀抢购成功的消息生成抢购订单
订单中心创建完订单会发送订单创建成功的消息到MQ,在seckill-order模块中添加如下代码:
@Component
public class BootListener implements CommandLineRunner {public static final Long ORDER_RATE_LIMIT = 500L;//系统启动会自动执行run()方法public void run(String... strings) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("seckill-order-consumer-group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("flash_sale_success_inform", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : messageExts) {System.out.println(new String(messageExt.getBody()));//抢购成功的通知JSONObject flashSaleSuccessInform = JSONObject.parseObject(new String(messageExt.getBody()));//调用订单中心提供的接口进行秒杀抢购的下单,订单中心创建完订单会发送订单创建成功的消息到MQLong userId = flashSaleSuccessInform.getLong("userId");Long productId = flashSaleSuccessInform.getLong("productId");//TODO: 在这里需要对下单进行简单限流(固定窗口算法)JedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();Boolean orderResult = false;while (!orderResult) {Date now = new Date();SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentSecond = dateFormat.format(now);Long result = jedis.incr("flash_sale::order::rate_limit::" + currentSecond);if (result < ORDER_RATE_LIMIT) {System.out.println("调用调用订单中心提供的接口进行秒杀抢购的下单,用户id: " + userId + ", 商品id: " + productId);orderResult = true;} else {//如果当前这一秒限流了,此时休眠一秒,下一秒继续进行下单即可try {Thread.sleep(1000);} catch(InterruptedException e) {e.printStackTrace();}}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("消费者启动......");}
}
(5)消费订单创建成功的消息更新秒杀商品库存
在seckill-inventory模块中添加如下代码:
@Component
public class BootListener implements CommandLineRunner {public void run(String... strings) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("seckill-inventory-consumer-group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("seckill_product_added_topic", "*"); consumer.subscribe("order_pay_result_inform", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : messageExts) {System.out.println(new String(messageExt.getBody()));String topic = messageExt.getTopic();if (topic.equals("seckill_product_added_topic")) {//获取秒杀场次里增加的是哪个商品String seckillProductJSON = new String(messageExt.getBody());JSONObject seckillProductJSONObject = JSONObject.parseObject(seckillProductJSON);//调用库存中心提供的接口,冻结商品用于秒杀活动的库存Long productId = seckillProductJSONObject.getLong("productId");Double seckillPrice = seckillProductJSONObject.getDouble("seckillPrice");Long seckillStock = seckillProductJSONObject.getLong("seckillStock");JedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();String inventoryInitedFlag = jedis.get("seckill::product-inventory-inited::flag::" + productId);if (inventoryInitedFlag != null && inventoryInitedFlag.equals("inited")) {continue;}jedis.set("seckill::product-inventory-inited::flag::" + productId, "inited");System.out.println("调用库存中心提供的接口,冻结商品用于秒杀活动的库存");//库存中心:可售库存、锁定库存、已售库存、冻结库存//对秒杀商品的库存进行分片,存放在Redis各节点上RedisCluster redisCluster = RedisCluster.getInstance();redisCluster.initSeckillProductStock(productId,seckillStock);} else if (topic.equals("order_pay_result_inform")) {//解析订单支付结果的通知JSONObject orderPayResult = JSONObject.parseObject(new String(messageExt.getBody()));Long userId = orderPayResult.getLong("userId");Long productId = orderPayResult.getLong("productId");Boolean orderPaySuccess = orderPayResult.getInteger("orderPaySuccess") == 1 ? true : false;//幂等性保障JedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();String orderPayResultProcessedFlag = jedis.get("seckill::order-pay-result-processed::flag::" + userId + "::" + productId);if (orderPayResultProcessedFlag != null && orderPayResultProcessedFlag.equals("processed")) {continue;}jedis.set("seckill::order-pay-result-processed::flag::" + userId + "::" + productId, "processed");//获取当时秒杀成功时的库存分片所在Redis节点String stockShardRedisNode = jedis.get("flash_sale::stock_shard::" + userId + "::" + productId);RedisCluster redisCluster = RedisCluster.getInstance();if (orderPaySuccess) {//如果秒杀订单支付成功redisCluster.flashSaleOrderPaySuccess(stockShardRedisNode, productId);} else {//如果秒杀订单支付失败或取消redisCluster.flashSaleOrderPayFail(stockShardRedisNode, productId);}System.out.println("秒杀抢购商品的订单支付结果处理成功......");}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("消费者启动......");}
}
在RedisCluster中增加订单支付成功和支付失败时的库存处理逻辑:
//基于Redis集群进行数据分片的工具类
public class RedisCluster {//Redis集群对应的Jedis列表private List<Jedis> cluster = new ArrayList<Jedis>();//私有构造函数private RedisCluster() {//初始化redis集群cluster.add(new Jedis("127.0.0.1", 6479));cluster.add(new Jedis("127.0.0.1", 6579));cluster.add(new Jedis("127.0.0.1", 6679));}//单例private static class Singleton {static RedisCluster instance = new RedisCluster();}//获取单例public static RedisCluster getInstance() {return Singleton.instance;}//初始化秒杀商品库存public void initSeckillProductStock(Long productId, Long seckillStock) {//计算每个Redis节点上的库存分片数量int clusterSize = cluster.size();Long seckillStockPerNode = seckillStock / clusterSize;Long remainSeckillStock = seckillStock - seckillStockPerNode * clusterSize;Long seckillStockLastNode = seckillStockPerNode + remainSeckillStock;System.out.println("每个Redis节点(除最后一个节点外)的库存数量为:" + seckillStockPerNode);System.out.println("最后一个Redis节点的库存数量为:" + seckillStockLastNode);System.out.println("Redis集群的总库存数量为:" + (seckillStockPerNode * (clusterSize - 1) + seckillStockLastNode));//对除最后一个Redis节点之外的其他节点,进行库存分片数据初始化for (int i = 0; i < cluster.size() - 1; i++) {Jedis jedis = cluster.get(i);Map<String, String> dataMap = new HashMap<String, String>();dataMap.put("salableStock", String.valueOf(seckillStockPerNode));dataMap.put("lockedStock", "0");dataMap.put("soldStock", "0");jedis.hset("seckill::product::" + productId + "::stock", dataMap);}//对Redis集群最后一个节点,进行库存分片数据初始化Jedis jedis = cluster.get(cluster.size() - 1);Map<String, String> dataMap = new HashMap<String, String>();dataMap.put("salableStock", String.valueOf(seckillStockLastNode));dataMap.put("lockedStock", "0");dataMap.put("soldStock", "0");jedis.hset("seckill::product::" + productId + "::stock", dataMap);}//基于Redis集群进行秒杀抢购public Boolean flashSale(Long userId, Long productId) {//随机选择一个Redis的节点int redisNodeCount = cluster.size();int chosenRedisNodeIndex = new Random().nextInt(redisNodeCount);Jedis chosenRedisNode = cluster.get(chosenRedisNodeIndex);//向Redis节点提交一个Lua脚本进行抢购String flashSaleLuaScript = ""+ "local productKey = 'seckill::product::" + productId + "::stock';"+ "local salableStock = redis.call('hget', productKey, 'salableStock');"+ "local lockedStock = redis.call('hget', productKey, 'lockedStock');"+ "if (salableStock > 0) "+ "then "+ " redis.call('hset', productKey, 'salableStock', salableStock - 1);"+ " redis.call('hset', productKey, 'lockedStock', lockedStock + 1);"+ " return 'success';"+ "else "+ " return 'fail';"+ "end;";//通过Jedis的eval()方法执行Lua脚本String flashSaleResult = (String) chosenRedisNode.eval(flashSaleLuaScript);//如果秒杀抢购成功了if ("success".equals(flashSaleResult)) {JedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();//记录用户秒杀成功的商品库存属于哪个分片jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(chosenRedisNodeIndex));return true;}//如果第一次秒杀抢购失败,则进行库存分片迁移的操作//当然可以继续优化为,库存分片为0的节点后续不会再被选中else {Boolean flashSaleSuccess = false;for (int i = 0; i < cluster.size(); i++) {if (i != chosenRedisNodeIndex) {Jedis redisNode = cluster.get(i);flashSaleResult = (String) redisNode.eval(flashSaleLuaScript);if ("success".equals(flashSaleResult)) {JedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(i));flashSaleSuccess = true;break;}}}return flashSaleSuccess;}}//秒杀订单支付成功的库存处理逻辑public void flashSaleOrderPaySuccess(String stockShardRedisNode, Long productId) {Integer redisNodeIndex = Integer.valueOf(stockShardRedisNode);Jedis redisNode = cluster.get(redisNodeIndex);String luaScript = ""+ "local productKey = 'seckill::product::" + productId + "::stock';"+ "local lockedStock = redis.call('hget', productKey, 'lockedStock');"+ "local soldStock = redis.call('hget', productKey, 'soldStock');"+ "redis.call('hset', productKey, 'lockedStock', lockedStock - 1);"+ "redis.call('hset', productKey, 'soldStock', lockedStock + 1);";redisNode.eval(luaScript);}//秒杀订单支付失败的库存处理逻辑public void flashSaleOrderPayFail(String stockShardRedisNode, Long productId) {Integer redisNodeIndex = Integer.valueOf(stockShardRedisNode);Jedis redisNode = cluster.get(redisNodeIndex);String luaScript = ""+ "local productKey = 'seckill::product::" + productId + "::stock';"+ "local salableStock = redis.call('hget', productKey, 'salableStock');"+ "local lockedStock = redis.call('hget', productKey, 'lockedStock');"+ "redis.call('hset', productKey, 'lockedStock', lockedStock - 1);"+ "redis.call('hset', productKey, 'salableStock', salableStock + 1);";redisNode.eval(luaScript);}
}
9.实现秒杀系统高可用架构的伪代码
(1)Redis集群故障时的高可用
(2)MQ集群故障时的高可用
(3)订单系统异常时的高可用
(1)Redis集群故障时的高可用
某个库存分片故障时自动迁移其他库存分片,或者将抢购请求刷盘并启用线程处理。
//基于Redis集群进行数据分片的工具类
public class RedisCluster {//Redis集群对应的Jedis列表private List<Jedis> cluster = new ArrayList<Jedis>();//私有构造函数private RedisCluster() {//初始化redis集群cluster.add(new Jedis("127.0.0.1", 6479));cluster.add(new Jedis("127.0.0.1", 6579));cluster.add(new Jedis("127.0.0.1", 6679));}//单例private static class Singleton {static RedisCluster instance = new RedisCluster();}//获取单例public static RedisCluster getInstance() {return Singleton.instance;}//初始化秒杀商品库存public void initSeckillProductStock(Long productId, Long seckillStock) {//计算每个Redis节点上的库存分片数量int clusterSize = cluster.size();Long seckillStockPerNode = seckillStock / clusterSize;Long remainSeckillStock = seckillStock - seckillStockPerNode * clusterSize;Long seckillStockLastNode = seckillStockPerNode + remainSeckillStock;System.out.println("每个Redis节点(除最后一个节点外)的库存数量为:" + seckillStockPerNode);System.out.println("最后一个Redis节点的库存数量为:" + seckillStockLastNode);System.out.println("Redis集群的总库存数量为:" + (seckillStockPerNode * (clusterSize - 1) + seckillStockLastNode));//对除最后一个Redis节点之外的其他节点,进行库存分片数据初始化for (int i = 0; i < cluster.size() - 1; i++) {Jedis jedis = cluster.get(i);Map<String, String> dataMap = new HashMap<String, String>();dataMap.put("salableStock", String.valueOf(seckillStockPerNode));dataMap.put("lockedStock", "0");dataMap.put("soldStock", "0");jedis.hset("seckill::product::" + productId + "::stock", dataMap);}//对Redis集群最后一个节点,进行库存分片数据初始化Jedis jedis = cluster.get(cluster.size() - 1);Map<String, String> dataMap = new HashMap<String, String>();dataMap.put("salableStock", String.valueOf(seckillStockLastNode));dataMap.put("lockedStock", "0");dataMap.put("soldStock", "0");jedis.hset("seckill::product::" + productId + "::stock", dataMap);}//基于Redis集群进行秒杀抢购public Boolean flashSale(Long userId, Long productId) {//随机选择一个Redis的节点int redisNodeCount = cluster.size();int chosenRedisNodeIndex = new Random().nextInt(redisNodeCount);Jedis chosenRedisNode = cluster.get(chosenRedisNodeIndex);//向Redis节点提交一个Lua脚本进行抢购String flashSaleLuaScript = ""+ "local productKey = 'seckill::product::" + productId + "::stock';"+ "local salableStock = redis.call('hget', productKey, 'salableStock');"+ "local lockedStock = redis.call('hget', productKey, 'lockedStock');"+ "if (salableStock > 0) "+ "then "+ " redis.call('hset', productKey, 'salableStock', salableStock - 1);"+ " redis.call('hset', productKey, 'lockedStock', lockedStock + 1);"+ " return 'success';"+ "else "+ " return 'fail';"+ "end;";//通过Jedis的eval()方法执行Lua脚本String flashSaleResult = (String) chosenRedisNode.eval(flashSaleLuaScript);String flashSaleResult = null;try {flashSaleResult = (String) chosenRedisNode.eval(flashSaleLuaScript);} catch(Exception e) {//如果在这里报错了,那么很有可能就是某一台Redis机器崩溃了//此时可能只是一部分的库存分片不可用,所以可以去找其他库存分片来进行秒杀抢购try {return tryOtherStockShard(chosenRedisNodeIndex, flashSaleLuaScript, userId, productId);} catch(Exception e1) {//高可用TODO//如果在这里报错,就表明所有的Redis节点都崩溃了//此时可以尝试把抢购请求写入到本地磁盘,让用户的抢购状态保持在抢购中,并开启线程进行处理return false;}}//如果秒杀抢购成功if ("success".equals(flashSaleResult)) {JedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(chosenRedisNodeIndex));return true;} else {//如果第一次秒杀抢购失败了,则进行库存分片迁移的操作try {return tryOtherStockShard(chosenRedisNodeIndex, flashSaleLuaScript, userId, productId);} catch(Exception e) {//高可用TODO//如果在这里报错,就表明所有的Redis节点都崩溃了//此时可以尝试把抢购请求写入到本地磁盘,让用户的抢购状态保持在抢购中,并开启线程进行处理return false;}}}//尝试其他的库存分片节点private Boolean tryOtherStockShard(int failedStockShard, String flashSaleLuaScript, Long userId, Long productId) throws Exception {String flashSaleResult = null;Boolean flashSaleSuccess = false;Boolean allRedisNodeCrashed = true;for (int i = 0; i < cluster.size(); i++) {if( i != failedStockShard) {try {Jedis redisNode = cluster.get(i);flashSaleResult = (String) redisNode.eval(flashSaleLuaScript);allRedisNodeCrashed = false;if ("success".equals(flashSaleResult)) {JedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();jedis.set("flash_sale::stock_shard::" + userId + "::" + productId, String.valueOf(i));flashSaleSuccess = true;break;}} catch(Exception e) {//高可用TODO//在尝试其他节点进行抢购的时候,其他某个节点也出现了宕机问题}}}//如果所有的Redis节点都崩溃了if (allRedisNodeCrashed) {throw new Exception("所有Redis节点都崩溃了!!!");}return flashSaleSuccess;}//秒杀订单支付成功的库存处理逻辑public void flashSaleOrderPaySuccess(String stockShardRedisNode, Long productId) {Integer redisNodeIndex = Integer.valueOf(stockShardRedisNode);Jedis redisNode = cluster.get(redisNodeIndex);String luaScript = ""+ "local productKey = 'seckill::product::" + productId + "::stock';"+ "local lockedStock = redis.call('hget', productKey, 'lockedStock');"+ "local soldStock = redis.call('hget', productKey, 'soldStock');"+ "redis.call('hset', productKey, 'lockedStock', lockedStock - 1);"+ "redis.call('hset', productKey, 'soldStock', lockedStock + 1);";redisNode.eval(luaScript);}//秒杀订单支付失败的库存处理逻辑public void flashSaleOrderPayFail(String stockShardRedisNode, Long productId) {Integer redisNodeIndex = Integer.valueOf(stockShardRedisNode);Jedis redisNode = cluster.get(redisNodeIndex);String luaScript = ""+ "local productKey = 'seckill::product::" + productId + "::stock';"+ "local salableStock = redis.call('hget', productKey, 'salableStock');"+ "local lockedStock = redis.call('hget', productKey, 'lockedStock');"+ "redis.call('hset', productKey, 'lockedStock', lockedStock - 1);"+ "redis.call('hset', productKey, 'salableStock', salableStock + 1);";redisNode.eval(luaScript);}
}
(2)MQ集群故障时的高可用
将MQ消息写入磁盘并启用线程进行处理。
//秒杀抢购请求处理接口
@RestController
@RequestMapping("/seckill/flash/sale")
public class FlashSaleController {//用户对商品进行抢购,默认限定每个商品最多只能抢购一件@GetMapping("/")public String flashSale(Long userId, Long productId) {//秒杀抢购代码的核心是Lua脚本,需要实现当库存分片为0时,自动进行库存分片节点迁移RedisCluster redisCluster = RedisCluster.getInstance();Boolean flashSaleResult = redisCluster.flashSale(userId, productId);//如果秒杀抢购成功了,则发送消息到MQ进行异步下单if (flashSaleResult) {DefaultMQProducer producer = RocketMQProducer.getInstance().getProducer();try {JSONObject flashSaleSuccessInform = new JSONObject();flashSaleSuccessInform.put("userId", userId);flashSaleSuccessInform.put("productId", productId);Message message = new Message("flash_sale_success_inform", null, flashSaleSuccessInform.toJSONString().getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(message);System.out.printf("%s%n", sendResult);System.out.println("发送秒杀抢购成功的消息到MQ成功......");} catch(Exception e) {System.err.println("发送秒杀抢购成功的消息到MQ失败:" + e);//高可用TODO//可以把MQ消息写入到本地磁盘的进行积压,然后开启一个后台线程不停尝试MQ是否恢复//如果MQ恢复了,就可以把本地磁盘积压的消息发送出去return "秒杀抢购成功,但是推送消息到MQ失败";}return "秒杀抢购成功";} else {return "秒杀抢购失败";}}
}
(3)订单系统异常时的高可用
判断MQ消息是否积压超过某时间,若是则进行快速失败释放库存。此外订单系统异常时,需要阻塞消费MQ消息的线程,之后再重试。
@Component
public class BootListener implements CommandLineRunner {public static final Long ORDER_RATE_LIMIT = 500L;public void run(String... strings) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("seckill-order-consumer-group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("flash_sale_success_inform", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : messageExts) {System.out.println(new String(messageExt.getBody()));//抢购成功的通知JSONObject flashSaleSuccessInform = JSONObject.parseObject(new String(messageExt.getBody()));//调用订单中心提供的接口进行秒杀抢购的下单Long userId = flashSaleSuccessInform.getLong("userId");Long productId = flashSaleSuccessInform.getLong("productId");//高可用TODO//对每个抢购成功的消息都获取一下其发送的时间戳//然后判断该消息是否已经在MQ里积压超过半个小时,如果是就进行快速失败//通过fail-fast机制,直接推送一条快速失败的消息到MQ//让库存系统消费该快速失败的消息,释放掉抢购商品的库存//接着再更新用户抢购的状态为抢购失败//在这里需要对下单进行限流,防止大量请求访问订单系统让订单系统产生压力JedisManager jedisManager = JedisManager.getInstance();Jedis jedis = jedisManager.getJedis();Boolean orderResult = false;while (!orderResult) {Date now = new Date();SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentSecond = dateFormat.format(now);Long result = jedis.incr("flash_sale::order::rate_limit::" + currentSecond);if (result < ORDER_RATE_LIMIT) {System.out.println("调用订单中心提供的接口进行秒杀抢购的下单,用户id: " + userId + ", 商品id: " + productId);//高可用TODO//如果订单系统崩溃了,那么执行到此处的消费线程应进入阻塞,不能继续消费后面的消息了//阻塞个几分钟过后,再尝试调用订单系统进行下单orderResult = true;} else {//如果当前这一秒限流了,此时休眠一秒,下一秒继续进行下单即可try {Thread.sleep(1000);} catch(InterruptedException e) {e.printStackTrace();}}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("消费者启动......");}
}