当前位置: 首页 > news >正文

商品中心—3.商品可采可补可售的技术文档下

大纲

1.可采可补可售业务的数据库建模设计

2.定时同步可采商品

3.定时同步可补商品

4.定时同步可售商品

5.商品中心架构梳理

(9)读取缓存与刷入缓存的逻辑

@Service
public class RecoverableServiceImpl implements RecoverableService {@Autowiredprivate RedisReadWriteManager redisReadWriteManager;...//对已经缓存的历史数据进行diff处理,处理缓存变更private void diffRecoverableCache(List<ProductSellerRelationBO> productSellerRelationBOList, List<SellerGroupResponse> sellerGroupResponses) {//1.获取卖家组ID集合Set<Long> sellerGroupIdList = sellerGroupResponses.stream().map(SellerGroupResponse::getSellerGroupId).collect(Collectors.toSet());//2.数据差集比较,并刷入缓存差异信息diffCache(sellerGroupIdList, productSellerRelationBOList);}//开始进行数据差集的处理private void diffCache(Set<Long> sellerIdList, List<ProductSellerRelationBO> productSellerRelationBOList) {//1.批量查询缓存Map<Long, List<String>> redisSetMap = redisReadWriteManager.getRedisSortedSet(sellerIdList, AbstractRedisKeyConstants::getSellerTypePurchaseSkuZsetKey);//转换缓存的值为具体的对象Map<Long, Map<String, ProductDetailDO>> productSellerRelationBOMap = redisManagerRepository.converterProductSellerCache(redisSetMap);//进行数据差集处理Map<String, RedisSortedSetCache> diffSortedSetCache = redisManagerRepository.diffProduct(productSellerRelationBOList, productSellerRelationBOMap, AbstractRedisKeyConstants::getSellerTypePurchaseSkuZsetKey);//执行数据缓存更新redisReadWriteManager.flushIncrSortedSetMap(diffSortedSetCache);}...
}@Service
public class RedisReadWriteManager {...//批量获取Sorted Setpublic <T> Map<T, List<String>> getRedisSortedSet(Collection<T> keys, Function<T, String> getRedisKeyFunction) {if (CollectionUtils.isEmpty(keys)) {return Maps.newHashMap();}Map<T, List<String>> responseMap = Maps.newHashMap();for (T key : keys) {String redisKey = getRedisKeyFunction.apply(key);responseMap.put(key, allRedisSortedSet(redisKey));}return responseMap;}//获取Sorted Set的所有数据private List<String> allRedisSortedSet(String redisKey) {Set<String> strings = redisCache.zrangeByScore(redisKey, 0L, Long.MAX_VALUE, 0L, Long.MAX_VALUE);List<String> sortedList = CollectionUtils.isEmpty(strings) ? new ArrayList<>() : new LinkedList<>(strings);return sortedList;}//刷新有序缓存public void flushIncrSortedSetMap(Map<String, RedisSortedSetCache> sortedSetSourceMap) {for (Map.Entry<String, RedisSortedSetCache> entry : sortedSetSourceMap.entrySet()) {//获取到缓存的key标志信息String key = entry.getKey();//缓存操作对象,每个卖家缓存一份RedisSortedSetCache sortedSetSource = entry.getValue();if (sortedSetSource.getDeleteKey()) {redisCache.delete(key);continue;}if (MapUtils.isNotEmpty(sortedSetSource.getAddMap())) {addSortedSet(sortedSetSource.getAddMap(), key);}if (!CollectionUtils.isEmpty(sortedSetSource.getDeleteMemberSet())) {removeSortedSet(key, sortedSetSource.getDeleteMemberSet());}}}//添加Sorted Setpublic void addSortedSet(Map<String, Double> addMap, String redisKey) {if (MapUtils.isEmpty(addMap)) {return;}for (Map.Entry<String, Double> entry : addMap.entrySet()) {String product = entry.getKey();Double score = entry.getValue();redisCache.zadd(redisKey, product, score);}}//删除Sorted Setpublic void removeSortedSet(String redisKey, Set<String> memberSet) {redisCache.zremove(redisKey, memberSet.toArray(new String[]{}));}...
}

(10)卖家组支持的可采商品与缓存的diff逻辑

@Repository
public class RedisManagerRepository {@Autowiredprivate SegmentIDGen segmentIDGen;...//将缓存的数据转换为实体对象public Map<Long, Map<String, ProductDetailDO>> converterProductSellerCache(Map<Long, List<String>> redisSetMap) {Map<Long, Map<String, ProductDetailDO>> productSellerRelationBOMap = new HashMap<>(redisSetMap.size());if (!CollectionUtils.isEmpty(redisSetMap)) {for (Map.Entry<Long, List<String>> entry : redisSetMap.entrySet()) {List<String> productSellerList = entry.getValue();Map<String, ProductDetailDO> productDetailMap = new HashMap<>(productSellerList.size());for (String content : productSellerList) {ProductDetailDO productDetailDO = JSONObject.parseObject(content, ProductDetailDO.class);productDetailMap.put(productDetailDO.getSkuId(), productDetailDO);}productSellerRelationBOMap.put(entry.getKey(), productDetailMap);}}return productSellerRelationBOMap;}//对缓存差集的数据进行处理public <T> Map<String, RedisSortedSetCache> diffProduct(List<ProductSellerRelationBO> productSellerRelationBOList,Map<Long, Map<String, ProductDetailDO>> productSellerCacheMap, Function<Long, String> getRedisKeyFunction) {Map<String, RedisSortedSetCache> redisSortedSetCacheMap = new HashMap<>();//1.处理缓存中需要新增的数据,删除数据Map<Long, ProductSellerRelationBO> productSellerRelationResidueMap = diffCacheAddOrDelete(productSellerRelationBOList, productSellerCacheMap, redisSortedSetCacheMap, getRedisKeyFunction);//2.处理缓存中不存在的卖家数据,新增处理diffAddSellerCache(productSellerRelationResidueMap, redisSortedSetCacheMap, getRedisKeyFunction);//3.处理缓存中存在的卖家数据,结果不存在,删除处理diffDeleteSellerCache(productSellerCacheMap, redisSortedSetCacheMap, getRedisKeyFunction);return redisSortedSetCacheMap;}//处理缓存中需要新增的数据,删除数据private <T> Map<Long, ProductSellerRelationBO> diffCacheAddOrDelete(List<ProductSellerRelationBO> productSellerRelationBOList, Map<Long, Map<String, ProductDetailDO>> productSellerCacheMap, Map<String, RedisSortedSetCache> redisSortedSetCacheMap, Function<Long, String> getRedisKeyFunction) {Map<Long, ProductSellerRelationBO> productSellerRelationResidueMap = new HashMap<>(productSellerRelationBOList.size());for (ProductSellerRelationBO productSellerRelation : productSellerRelationBOList) {RedisSortedSetCache redisSortedSetCache = new RedisSortedSetCache();Long sellerId = productSellerRelation.getSellerId();String redisKey = getRedisKeyFunction.apply(sellerId);//命中了缓存的数据,开始进行差集处理if (productSellerCacheMap.containsKey(sellerId)) {//获取当前这个卖家下的缓存商品信息,key为商品skuMap<String, ProductDetailDO> productDetailDOMap = productSellerCacheMap.get(sellerId);//卖家的处理后的可售商品列表List<ProductDetailDO> productDetailList = productSellerRelation.getProductDetailList();//处理返回差异的数据结果,需新增或者删除的数据集合for (ProductDetailDO productDetailDO : productDetailList) {//命中了则说明无差异,不处理,从集合中移除if (!CollectionUtils.isEmpty(productDetailDOMap) && productDetailDOMap.containsKey(productDetailDO.getSkuId())) {productDetailDOMap.remove(productDetailDO.getSkuId());continue;}//未命中,说明这个数据缓存中不存在,做新增处理redisSortedSetCache.getAddMap().put(JSONObject.toJSON(productDetailDO).toString(), getSortedSetScore(redisKey));}//遍历处理完成之后,缓存中还有多余的对象都属于无效数据,需要删除if (!CollectionUtils.isEmpty(productDetailDOMap)) {for (Map.Entry<String, ProductDetailDO> entry : productDetailDOMap.entrySet()) {redisSortedSetCache.getDeleteMemberSet().add(JSONObject.toJSON(entry.getValue()).toString());}}//设置到需要处理得缓存对象中redisSortedSetCacheMap.put(redisKey, redisSortedSetCache);//删除处理掉的缓存对象productSellerCacheMap.remove(sellerId);} else {//未命中缓存的 都是新增数据productSellerRelationResidueMap.put(sellerId, productSellerRelation);}}return productSellerRelationResidueMap;}//处理缓存中不存在的卖家数据,新增处理private void diffAddSellerCache(Map<Long, ProductSellerRelationBO> productSellerRelationMap, Map<String, RedisSortedSetCache> redisSortedSetCacheMap, Function<Long, String> getRedisKeyFunction) {Iterator<Map.Entry<Long, ProductSellerRelationBO>> iterator = productSellerRelationMap.entrySet().iterator();//对处理缓存差集后,还剩余的未处理数据做新增处理while (iterator.hasNext()) {Map.Entry<Long, ProductSellerRelationBO> entrys = iterator.next();ProductSellerRelationBO productSellerRelation = entrys.getValue();RedisSortedSetCache redisSortedSetCache = new RedisSortedSetCache();Long key = entrys.getKey();String redisKey = getRedisKeyFunction.apply(key);//卖家的处理后的可售商品列表List<ProductDetailDO> productDetailList = productSellerRelation.getProductDetailList();for (ProductDetailDO productDetailDO : productDetailList) {redisSortedSetCache.getAddMap().put(JSONObject.toJSON(productDetailDO).toString(), getSortedSetScore(redisKey));}//设置到需要处理得缓存对象中redisSortedSetCacheMap.put(redisKey, redisSortedSetCache);}}//处理缓存中存在的卖家数据,结果不存在,删除处理private void diffDeleteSellerCache(Map<Long, Map<String, ProductDetailDO>> productSellerCacheMap,Map<String, RedisSortedSetCache> redisSortedSetCacheMap, Function<Long, String> getRedisKeyFunction) {//当可售的列表中,不存在缓存已经存在的数据列表,说明缓存已经无效,需要删除该对应的key下的缓存信息if (!CollectionUtils.isEmpty(productSellerCacheMap)) {for (Map.Entry<Long, Map<String, ProductDetailDO>> entry : productSellerCacheMap.entrySet()) {Long key = entry.getKey();Map<String, ProductDetailDO> value = entry.getValue();if (CollectionUtils.isEmpty(value)) {continue;}String redisKey = getRedisKeyFunction.apply(key);RedisSortedSetCache redisSortedSetCache = new RedisSortedSetCache();redisSortedSetCache.setDeleteKey(Boolean.TRUE);//设置到需要处理得缓存对象中redisSortedSetCacheMap.put(redisKey, redisSortedSetCache);}}}...//新增缓存时需要获取每个缓存key存储的权重值public Double getSortedSetScore(String cacheKey) {//新增可采商品缓存时基于db的分段发号Long autoNo = segmentIDGen.genNewNo(cacheKey);return Double.valueOf(autoNo);}
}

(11)基于DB的分段发号器组件

一.号段内存缓冲组件SegmentBuffer

//号段内存缓冲组件
@Data
@Accessors(chain = true)
public class SegmentBuffer {//线程是否在运行中private final AtomicBoolean threadRunning;private final ReadWriteLock lock;private String bizTag;//双bufferprivate Segment[] segments;//当前的使用的segment的indexprivate volatile int currentPos;//下一个segment是否处于可切换状态private volatile boolean nextReady;//是否初始化完成private volatile boolean initOk;private volatile int step;private volatile int minStep;private volatile long updateTimestamp;public SegmentBuffer() {segments = new Segment[]{new Segment(this), new Segment(this)};currentPos = 0;nextReady = false;initOk = false;threadRunning = new AtomicBoolean(false);lock = new ReentrantReadWriteLock();}public Segment getCurrent() {return segments[currentPos];}public Lock rLock() {return lock.readLock();}public Lock wLock() {return lock.writeLock();}public int nextPos() {return (currentPos + 1) % 2;}public void switchPos() {currentPos = nextPos();}...
}
@Component
public class SegmentIDCache implements ApplicationListener<ContextRefreshedEvent> {private final Map<String, SegmentBuffer> cache = new ConcurrentHashMap<>();@Resourceprivate LeafAllocNoMapper leafAllocNoMapper;private volatile boolean initOk = false;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {checkAndInit();}//初始化数据private void checkAndInit() {if (!initOk) {synchronized (this) {if (!initOk) {log.info("Init ...");//确保加载到kv后才初始化成功updateCacheFromDb();initOk = true;log.info("Init Ok ...");}}}}public boolean isInitOk() {return initOk;}public boolean containsKey(String bizCode) {checkAndInit();return cache.containsKey(bizCode);}public SegmentBuffer getValue(String bizCode) {checkAndInit();return cache.get(bizCode);}//单个初始化加载public void updateCacheFromDb(String bizCode) {log.info("update cache from db");try {LeadAllocDO dbBizCodes = leafAllocNoMapper.findByBizTag(bizCode);if (Objects.isNull(dbBizCodes)) {return;}SegmentBuffer buffer = new SegmentBuffer();buffer.setBizTag(bizCode);Segment segment = buffer.getCurrent();segment.setValue(new AtomicLong(0));segment.setMax(0);segment.setStep(0);cache.put(bizCode, buffer);log.info("Add bizCode {} from db to IdCache, SegmentBuffer {}", bizCode, buffer);} catch (Exception e) {log.warn("update cache from db exception", e);} finally {log.info("updateCacheFromDb,cost:{}", 0);}}//更新缓存keyprivate void updateCacheFromDb() {log.info("update cache from db");try {List<String> dbBizCodes = leafAllocNoMapper.listAllBizTag();if (CollectionUtils.isEmpty(dbBizCodes)) {return;}List<String> cacheBiz = new ArrayList<>(cache.keySet());Set<String> insertBizSet = new HashSet<>(dbBizCodes);Set<String> removeBizSet = new HashSet<>(cacheBiz);//db中新加的tags灌进cachefor (String tmp : cacheBiz) {insertBizSet.remove(tmp);}for (String bizCode : insertBizSet) {SegmentBuffer buffer = new SegmentBuffer();buffer.setBizTag(bizCode);Segment segment = buffer.getCurrent();segment.setValue(new AtomicLong(0));segment.setMax(0);segment.setStep(0);cache.put(bizCode, buffer);log.info("Add bizCode {} from db to IdCache, SegmentBuffer {}", bizCode, buffer);}for (String tmp : dbBizCodes) {removeBizSet.remove(tmp);}for (String tag : removeBizSet) {cache.remove(tag);log.info("Remove tag {} from IdCache", tag);}} catch (Exception e) {log.warn("update cache from db exception", e);} finally {log.info("updateCacheFromDb,cost:{}", 0);}}
}

二.号段ID生成器组件SegmentIDGen

//号段ID生成器组件
@Service
public class SegmentIDGenImpl implements SegmentIDGen {//下一次异步更新比率因子public static final double NEXT_INIT_FACTOR = 0.9;//最大步长不超过100,0000private static final int MAX_STEP = 1000000;//默认一个Segment会维持的时间为15分钟//如果在15分钟内Segment就消耗完了,则步长要扩容一倍,但不能超过MAX_STEP//如果在超过15*2=30分钟才将Segment消耗完,则步长要缩容一倍,但不能低于MIN_STEP,MIN_STEP的值为数据库中初始的step字段值private static final long SEGMENT_DURATION = 15 * 60 * 1000L;//更新因子//更新因子=2时,表示成倍扩容或者折半缩容private static final int EXPAND_FACTOR = 2;private final ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new UpdateThreadFactory());@Autowiredprivate LeafAllocNoRepository leafAllocNoRepository;@Resourceprivate SegmentIDCache cache;//生成新的ID@Overridepublic Long genNewNo(String bizTag) {if (!cache.isInitOk()) {throw new RuntimeException("not init");}//如果没有,此时需要初始化一个if (!cache.containsKey(bizTag)) {leafAllocNoRepository.insertLeadAlloc(bizTag);cache.updateCacheFromDb(bizTag);}SegmentBuffer buffer = cache.getValue(bizTag);if (!buffer.isInitOk()) {synchronized (buffer) {if (!buffer.isInitOk()) {try {updateSegmentFromDb(bizTag, buffer.getCurrent());log.info("Init buffer. Update leafkey {} {} from db", bizTag, buffer.getCurrent());buffer.setInitOk(true);} catch (Exception e) {log.warn("Init buffer {} exception", buffer.getCurrent(), e);throw new RuntimeException("init error:" + bizTag);}}}}return getIdFromSegmentBuffer(buffer);}//从segment缓冲中获取idprivate Long getIdFromSegmentBuffer(SegmentBuffer buffer) {while (true) {buffer.rLock().lock();try {final Segment segment = buffer.getCurrent();if (!buffer.isNextReady() && (segment.getIdle() < NEXT_INIT_FACTOR * segment.getStep())&& buffer.getThreadRunning().compareAndSet(false, true)) {asyncUpdate(buffer);}long value = segment.getValue().getAndIncrement();if (value < segment.getMax()) {return value;}} finally {buffer.rLock().unlock();}//获取的value,大于max,则等待其他线程更新完毕。最多等待100swaitAndSleep(buffer);buffer.wLock().lock();try {final Segment segment = buffer.getCurrent();long value = segment.getValue().getAndIncrement();if (value < segment.getMax()) {return value;}if (buffer.isNextReady()) {buffer.switchPos();buffer.setNextReady(false);} else {log.error("Both two segments in {} are not ready!", buffer);throw new RuntimeException("next not ready");}} finally {buffer.wLock().unlock();}}}//异步更新初始化private void asyncUpdate(SegmentBuffer buffer) {long submitTime = System.currentTimeMillis();threadPoolExecutor.execute(() -> {long executeTime = System.currentTimeMillis();Segment next = buffer.getSegments()[buffer.nextPos()];boolean updateOk = false;try {updateSegmentFromDb(buffer.getBizTag(), next);updateOk = true;} catch (Exception e) {log.warn("{} updateSegmentFromDb exception", buffer.getBizTag(), e);} finally {long finishTime = System.currentTimeMillis();log.info("update segment {} from db {}。st:{}, et:{}, ft:{}", buffer.getBizTag(), next, submitTime, executeTime, finishTime);if (updateOk) {buffer.wLock().lock();buffer.setNextReady(true);buffer.getThreadRunning().set(false);buffer.wLock().unlock();} else {buffer.getThreadRunning().set(false);}}});}//自旋10000次之后,睡眠10毫秒private void waitAndSleep(SegmentBuffer buffer) {int roll = 0;while (buffer.getThreadRunning().get()) {roll += 1;if (roll > 10000) {try {TimeUnit.MILLISECONDS.sleep(10);break;} catch (InterruptedException e) {log.warn("Thread {} Interrupted", Thread.currentThread().getName());break;}}}}//从db中更新号段public void updateSegmentFromDb(String bizTag, Segment segment) {SegmentBuffer buffer = segment.getBuffer();LeadAllocDO leadAllocDO;if (!buffer.isInitOk()) {leadAllocDO = leafAllocNoRepository.updateMaxIdAndGet(bizTag);buffer.setStep(leadAllocDO.getStep());buffer.setMinStep(leadAllocDO.getStep());} else if (buffer.getUpdateTimestamp() == 0) {leadAllocDO = leafAllocNoRepository.updateMaxIdAndGet(bizTag);buffer.setUpdateTimestamp(System.currentTimeMillis());buffer.setStep(leadAllocDO.getStep());buffer.setMinStep(leadAllocDO.getStep());} else {int nextStep = calculateNextStep(bizTag, buffer);leadAllocDO = leafAllocNoRepository.updateMaxIdByDynamicStepAndGet(bizTag, nextStep);buffer.setUpdateTimestamp(System.currentTimeMillis());buffer.setStep(nextStep);buffer.setMinStep(leadAllocDO.getStep());}// must set value before set maxlong value = leadAllocDO.getMaxId() - buffer.getStep();segment.getValue().set(value);segment.setMax(leadAllocDO.getMaxId());segment.setStep(buffer.getStep());log.info("updateSegmentFromDb, bizTag: {}, cost:0, segment:{}", bizTag, segment);}//计算新的步长private int calculateNextStep(String bizCode, SegmentBuffer buffer) {long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();int nextStep = buffer.getStep();if (duration < SEGMENT_DURATION) {nextStep = Math.min(MAX_STEP, nextStep * EXPAND_FACTOR);} else if (duration < SEGMENT_DURATION * EXPAND_FACTOR) {// do nothing with nextStep} else {nextStep = Math.max(buffer.getMinStep(), nextStep / EXPAND_FACTOR);}log.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", bizCode, buffer.getStep(), String.format("%.2f", ((double) duration / (1000 * 60))), nextStep);return nextStep;}public static class UpdateThreadFactory implements ThreadFactory {private static int threadInitNumber = 0;private static synchronized int nextThreadNum() {return threadInitNumber++;}@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());}}
}

3.定时同步可补商品

(1)时序图

(2)流程图

(3)代码实现

(1)时序图

(2)流程图

一.分⻚查询卖家组信息。每次处理完⼀批数据后,剩余数据会继续通过下⼀⻚来查询获取。

二.根据卖家组和商品的关系表查询出卖家组所⽀持的商品。查询商品与卖家组关系表sku_seller_relation,过滤掉⾮⾃营的商品。

三.根据查询返回的item列表批量查询得到商品的item信息。查询过程需要商品表item_info关联查询商品扩展属性表attribute_extend,然后填充屏蔽微仓的扩展JSON信息,并对该屏蔽的卖家ID进⾏过滤商品。

四.遍历商品检查商品状态是否为试销。商品状态:1准备上架、2试销上架、3上架、4预下架、5下架、6停售。如果是试销商品,则查询当前商品归属的仓是否有试销标识。如果不是试销商品或试销商品所属的仓没有试销标识,则进⾏过滤。

五.根据商品的存储条件对归属仓的存储条件进⾏匹配。只有商品的存储条件和归属仓的存储条件匹配了才允许补货,存储条件即sellerLableList卖家标签:1常温、2冷藏、3冷冻、4⽔产。

六.进⾏组套商品验证。先通过sku批量查询组套商品与SKU关系表stack_sku_relation,获取到每个sku下的原料以及普通商品信息,然后对每个商品进⾏试销判断以及存储条件处理,两者都满⾜才允许补货。

七.对已存在的缓存数据和当前这次同步处理后的数据进⾏差集diff处理。发⽣变化的数据才需要刷⼊缓存,比如新增的或者⽆效的数据。

八.构建缓存模型,对可补的商品信息进⾏缓存。key为'前缀标识+卖家ID',value为'可补的sku列表'。

(3)代码实现

@Component
public class CompensationJobHandler {@DubboReference(version = "1.0.0")private CompensationApi compensationApi;@XxlJob("syncCompensationProduct")public void syncCompensationProduct(CompensationRequest request) {XxlJobHelper.log("sync available product job starting...");JsonResult result = compensationApi.syncCompensationProduct(request);XxlJobHelper.log("sync available product job end, result:{}", result);}
}@DubboService(version = "1.0.0", interfaceClass = CompensationApi.class, retries = 0)
public class CompensationApiImpl implements CompensationApi {@Resourceprivate CompensationService compensationService;//同步可补商品@Overridepublic JsonResult syncCompensationProduct(CompensationRequest request) {try {return compensationService.syncCompensationProduct(request);} catch (ProductBizException e) {log.error("biz error: ", e);return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());} catch (Exception e) {log.error("system error: ", e);return JsonResult.buildError(e.getMessage());}}
}@Service
public class CompensationServiceImpl implements CompensationService {...//同步可补的数据入缓存@Overridepublic JsonResult syncCompensationProduct(CompensationRequest request) {//分页查询商品信息,处理商品可补的数据Integer pageNo = 1;List<SellerGroupResponse> sellerGroupResponses = querySellerGroupList(pageNo, request.getSellerGroupType());while (!CollectionUtils.isEmpty(sellerGroupResponses)) {//1.过滤卖家组的非有效状态信息数据List<SellerGroupResponse> sellerGroupResponseList = sellerGroupFilter(sellerGroupResponses);//2.根据卖家组获取卖家支持的可售商品列表List<SkuSellerRelationDO> sellerRelationDOList = queryAvailableProduct(sellerGroupResponseList);//3.查询商品信息,并过滤非自营的商品,填充关于商品的扩展信息List<ProductDetailDO> productDetailDOList = queryProductDetailList(sellerRelationDOList);//4.解析屏蔽微仓的扩展json,对屏蔽的卖家ID进行过滤,并将详情的商品sku信息绑定到卖家上List<ProductSellerRelationBO> sellerFilterList = sellerFilter(productDetailDOList, sellerRelationDOList, sellerGroupResponses);//5.普通商品过滤(试销品过滤,存储条件过滤)List<ProductSellerRelationBO> itemFilterList = itemFilter(sellerFilterList);//6.组套商品过滤(试销品过滤,存储条件过滤)List<ProductSellerRelationBO> suitFilterList = suitFilter(itemFilterList);//7.读取历史的缓存信息,对已经存在的缓存进行diff处理,处理新增或者修改的数据更新缓存diffRecoverableCache(suitFilterList, sellerGroupResponses);pageNo++;sellerGroupResponses = querySellerGroupList(pageNo, request.getSellerGroupType());}return JsonResult.buildSuccess();}...
}

4.定时同步可售商品

(1)时序图

(2)流程图

(3)代码实现

(1)时序图

(2)流程图

一.分页查询卖家组信息,调⽤查询卖家组信息接⼝,返回卖家组信息列表。

二.对返回的卖家组列表进行过滤,过滤无效状态的卖家组。

三.根据卖家组列表获取⽀撑这些卖家组所⽀持售卖的商品,通过商品与卖家组关系表sku_seller_relation查询出卖家组⽀持售卖的商品。查询时匹配字段relation_type关系类型,只获取可售商品。relation_type关系类型有:1可售、2屏蔽。

四.根据商品卖家库存关系表查询得到卖家组⽀持售卖的库存充⾜的商品。商品卖家库存关系表是sku_stock_seller_relation,查询时匹配库存数量stock_num > 0,且seller_type卖家类型是⾃营的商品。seller_type卖家类型有:1⾃营、2POP。

五.通过商品与卖家组关系表和商品卖家库存关系表构建卖家可售商品信息。商品与卖家组关系表是sku_seller_relation,商品卖家库存关系表是sku_stock_seller_relation。

六.根据商品属性扩展表查询到商品的属性内容进行过滤,也就是对商品中屏蔽卖家的扩展信息进⾏筛选。商品属性扩展表是attribute_extend,商品的属性内容表是attribute_content。

七.读取历史的缓存信息,对已经存在的缓存进⾏diff处理,需要处理新增或者修改的数据更新缓存。

八.构建缓存模型,对可售的商品数据进⾏缓存。卖家的缓存的模型对象为:key为'前缀标识+卖家ID',value为'可售skuId'。卖家类型的缓存模型对象为:key为'前缀标识+卖家类型',value为'可售skuId'。

(3)代码实现

@Component
public class AvailableJobHandler {@DubboReference(version = "1.0.0")private AvailableApi availableApi;@XxlJob("syncAvailableProduct")public void syncAvailableProduct() {XxlJobHelper.log("sync available product job starting...");JsonResult result = availableApi.syncAvailableProduct(null);XxlJobHelper.log("sync available product job end, result:{}", result);}
}@DubboService(version = "1.0.0", interfaceClass = AvailableApi.class, retries = 0)
public class AvailableApiImpl implements AvailableApi {@Resourceprivate AvailableService availableService;//同步可售商品@Overridepublic JsonResult syncAvailableProduct(AvailableRequest request) {try {Boolean success = availableService.syncAvailableProduct(request);return JsonResult.buildSuccess(success);} catch (ProductBizException e) {log.error("biz error: request={}", JSON.toJSONString(request), e);return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());} catch (Exception e) {log.error("system error: request={}", JSON.toJSONString(request), e);return JsonResult.buildError(e.getMessage());}}
}@Service
public class AvailableServiceImpl implements AvailableService {...//同步可售商品@Overridepublic Boolean syncAvailableProduct(AvailableRequest request) {//分页查询商品信息,处理商品可售的数据Integer pageNo = 1;//1.获取卖家组信息列表List<SellerGroupResponse> sellerGroupList = querySellerGroupList(pageNo, request.getSellerGroupType());//自营还是POP可售的skuId集合Map<Integer, List<String>> sellerTypeSkuIdMap = new HashMap<>(2);while (!CollectionUtils.isEmpty(sellerGroupList)) {//2.对卖家组状态非有效的进行过滤List<SellerGroupResponse> effectiveSellerGroupList = sellerGroupFilter(sellerGroupList);//3.根据卖家组列表获取卖家组所支持售卖的商品List<SkuSellerRelationDTO> saleProductList = querySaleProductList(effectiveSellerGroupList);//4.获取自营可售商品的且库存充足的信息Map<Long, String> sellerInStockMap = querySellerInStockProductMap(saleProductList);//5.构建卖家可售商品信息Map<Long, List<String>> sellerSaleProductMap = buildSellerAvailableProduct(sellerInStockMap, effectiveSellerGroupList, saleProductList);//6.对商品中屏蔽卖家的扩展信息进行筛选sellerSaleProductMap = sellerFilter(sellerSaleProductMap, saleProductList);//7.合并卖家类型的可售商品mergeSellerTypeSkuIdMap(sellerTypeSkuIdMap, saleProductList, sellerSaleProductMap);//8.读取历史的缓存信息,对已经存在的缓存进行diff处理,需要处理新增或者修改的数据更新缓存//构建数据模型,将卖家可售商品缓存起来 缓存结构 key:"前缀标识+卖家id",value:"可售商品"Map<String, RedisSortedSetCache> sortedSetSourceMap = diffAvailableCache(sellerSaleProductMap, sellerGroupList);//9.刷新缓存信息redisReadWriteManager.flushIncrSortedSetMap(sortedSetSourceMap);pageNo++;sellerGroupList = querySellerGroupList(pageNo, request.getSellerGroupType());}//10.将卖家类型可售商品缓存起来 缓存结构 key:"前缀标识+卖家类型",value:"可售商品",逻辑与卖家可售商品缓存一致//这里需要将分页查询的卖家组的可售商品合并起来,最后做已经存在的缓存diff处理Map<String, RedisSortedSetCache> sortedSetSourceMap = diffSellerTypeAvailableCache(sellerTypeSkuIdMap);//11.刷新缓存信息redisReadWriteManager.flushIncrSortedSetMap(sortedSetSourceMap);return Boolean.TRUE;}...
}

5.商品中心架构梳理

商品M端系统:eshop-construction-service
商品C端系统:eshop-diplomacy-service
商品生命周期系统:eshop-lifecycle-service
商品调度系统:eshop-scheduling-service(定时调度可采可补可售任务)
商品补货系统:eshop-replenished-service(处理商品可采可补可售)
商品卖家系统:eshop-seller-service

http://www.xdnf.cn/news/978643.html

相关文章:

  • 前端面试宝典---事件循环面试题
  • 小白学Pinia状态管理
  • STM32G DMA串口发送接收
  • Linux开发工具之VsCode(Filezila、MobaXterm、Vim三合一)
  • 【笔记】NVIDIA AI Workbench 中安装 cuDNN 9.10.2
  • 每日Prompt:人像写真
  • 内存泄漏系列专题分析之二十:camx swap内存泄漏实例分析
  • Babylon.js引擎(二)
  • 【Chipyard】 conda 环境安装与使用
  • k8s在节点上加污点
  • k8s 部署服务常见错误原因
  • Windows 安装 Maven
  • 1Panel 部署 OpenResty + Redis 实现 IP 动态封禁教程
  • 软考 系统架构设计师系列知识点之杂项集萃(87)
  • Visual Studio 2022 运行提示:未在本地计算机上注册“Microsoft.Jet.OLEDB.4.0”提供程序。
  • jsoncpp ubuntu编译问题
  • Proof of Talk专访CertiK联创顾荣辉:全周期安全方案护航Web3生态
  • Cilium动手实验室: 精通之旅---22.Cilium Traffic Optimization
  • OA协同平台有哪些功能?OA协同办公软件平台如何选择?
  • 腾讯开源 ovCompose 跨平台框架:实现一次跨三端(Android/iOS/鸿蒙)
  • 网络请求与本地存储:Axios 与 AsyncStorage 在 React Native 中的应用
  • 升级 Ubuntu Linux 内核的几种不同方法
  • 同步与异步:软件工程中的时空艺术与实践智慧-以蜻蜓hr人才系统举例-优雅草卓伊凡
  • 二刷苍穹外卖 day02
  • 2023蓝桥杯C/C++ B组国赛
  • PyTorch:让深度学习飞入寻常百姓家(从零开始玩转张量与神经网络!)
  • 开疆智能ModbusTCP转Canopen网关连接汇川PLC配置案例
  • 【android bluetooth 框架分析 04】【bt-framework 层详解 4】【AdapterState介绍】
  • 25-Oracle 23ai DBMS_SEARCH — Ubiquitous Search(无处不在的搜索)
  • Qt Connections详解:信号与槽的核心机制