前言
接上篇,[架构设计]有关线下多台机器并发上传签到数据的设计架构[RocketMQ],本篇博客将进行代码实现及部分细节分析
本篇博客将
- 完成业务逻辑的落地实现
- 编码
- 补上一些其他业务上的细节
本篇博客将不会
- 教你怎么部署分布式应用
- 教你构建EurekaServer集群
- 教你并发测试代码
- 教你JPA
- 实现单元测试
- 保证RocketMQ消息的时效性
- 把代码讲的太细,基本都是贴一下,自行李姐
开始
先贴一下github库地址,大家可以先看看代码,因为我有部分代码不会放到博客里讲:
来回忆一下上篇的博客的架构设计↓
简化一下,并且补上剩余的部分↓
其中:
黑色外框是我们需要实现的部分:
- EurekaServer
- Gateway
- 考勤数据预处理
- 考勤局处理
灰色外框是我们不需要实现的部分:
- 前端请求
- 消息中间件RocketMQ
- 数据库
虚线链接的是需要注册到Eureka的组件
先来简单的
数据库配置:
这里我就不放sql文件了,因为真的很简单,大家手动建一下就行
EurekaServer的单机配置:
spring.application.name=eureka-server
server.port=11800
eureka.instance.hostname=localhost
eureka.client.fetch-registry=false
eureka.client.register-with-eureka=false
#每ms剔除续约到期的服务
eureka.server.eviction-interval-timer-in-ms= 5000
#关闭eureka自我保护机制
eureka.server.enable-self-preservation= false
eureka.client.serviceUrl.defaultZone=http://localhost:${server.port}/eureka/
EurekaServer的集群配置:
application-eureka-1.properties
spring.profiles=eureka-1
spring.application.name=eureka-server
server.port=11800
eureka.instance.hostname=eureka-1
eureka.client.fetch-registry=true
eureka.client.register-with-eureka=true
eureka.client.serviceUrl.defaultZone=http://eureka-2:11801/eureka/,http://eureka-3:11802/eureka/
application-eureka-2.properties
spring.profiles=eureka-2
spring.application.name=eureka-server
server.port=11801
eureka.instance.hostname=eureka-2
eureka.client.fetch-registry=true
eureka.client.register-with-eureka=true
eureka.client.serviceUrl.defaultZone=http://eureka-2:11800/eureka/,http://eureka-3:11802/eureka/
application-eureka-3.properties
spring.profiles=eureka-3
spring.application.name=eureka-server
server.port=11802
eureka.instance.hostname=eureka-3
eureka.client.fetch-registry=true
eureka.client.register-with-eureka=true
eureka.client.serviceUrl.defaultZone=http://eureka-1:11800/eureka/,http://eureka-2:11801/eureka/
Gateway API网关
因为用的配置文件的形式实现转发,所以不贴Bean类型的配置了
server.port=12810
spring.application.name=gateway-api
#
#spring.cloud.gateway.discovery.locator.enabled=true
eureka.client.serviceUrl.defaultZone=http://localhost:11800/eureka/,http://localhost:11801/eureka/,http://localhost:11802/eureka/
#logging.level.org.springframework.cloud.gateway=debug
#10s没有收到心跳就剔除服务
eureka.instance.lease-expiration-duration-in-seconds=10
#每4s发送一次心跳
eureka.instance.lease-renewal-interval-in-seconds=4
#每隔五秒获取服务列表
eureka.client.registry-fetch-interval-seconds=5
#这里重要!
#用于转发到签到预处理程序的路由
spring.cloud.gateway.routes[0].id=PRE-ATTEND
spring.cloud.gateway.routes[0].uri=lb://PRE-ATTEND
spring.cloud.gateway.routes[0].predicates[0]= Path=/api/**
spring.cloud.gateway.routes[0].filters[0]= StripPrefix=1
MODEL
MODEL就不贴了,很简单的,看代码就行
预处理程序-PRE_ATTEND
看一下预处理程序的业务↓
我们需要实现:
- 接受并处理签到请求
- 将处理完的消息发送到
RocketMQ
PreAttendController.java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import xyz.xy718.ResultBean;
import xyz.xy718.pre_attend.service.RocketMQService;
import javax.annotation.Resource;
/**
* 签到预处理
* @author: Xy718
* @date: 2020/8/3 11:23
* @description: 打卡签到预处理程序
*/
@RestController
@RequestMapping("/attend")
@Slf4j
public class PreAttendController {
@Value("${server.port}")
String port;
@Resource
RocketMQService rocketMQService;
/**
* 根据卡号提交记录
* @return
*/
@PostMapping("/code")
public ResultBean attendOfCode(
@RequestParam("code")String code
){
//在这里对业务进行预处理,比如给签到消息加上timestamp
//或者是获取签到用户的类型,比如老师啊,学生啊,督导之类的
//具体看业务实现,如果单纯的加个code到消息中间件甚至不需要预处理程序
log.info("签到:"+code);
SendResult sendResult = null;
try {
sendResult = rocketMQService.sendMsg(code+":"+System.currentTimeMillis()) ;
} catch (Exception e) {
e.printStackTrace();
return ResultBean.error("签到失败,请重试"+e.getMessage());
}
return ResultBean.success(sendResult) ;
}
}
application.properties
server.port=13100
spring.application.name=pre-attend
spring.mvc.servlet.load-on-startup=1
server.tomcat.uri-encoding=utf-8
eureka.instance.hostname=localhost
eureka.client.serviceUrl.defaultZone=http://localhost:11800/eureka/,http://localhost:11801/eureka/,http://localhost:11802/eureka/
#10s没有收到心跳就剔除服务
eureka.instance.lease-expiration-duration-in-seconds=10
#每4s发送一次心跳
eureka.instance.lease-renewal-interval-in-seconds=4
#rocketMQ配置
rocketmq.producer.isOnOff=on
# 服务地址
rocketmq.producer.namesrvAddr=localhost:9876
# 消息最大长度 默认1024*4(4M)
rocketmq.producer.maxMessageSize=4096
# 发送消息超时时间,默认3000
rocketmq.producer.sendMsgTimeout=3000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2
# 配置 Group Topic Tag
rocket.group= SOME_APPLICATION_GROUP
rocket.topic= ATTEND_MQ_TOPIC
rocket.tag= pre_attend
有关预处理程序
的另外部分请看代码:
- 消息生产者 ProducerConfig.java
- 消息发布Service RocketMQService.java
到此预处理部分的代码就完事了,也很简单,就是对签到人的信息做一点处理然后发送到RocketMQ
我的代码部分将其写为:123123123:timestamp
的形式,是为了多次签到时在中间件中的每个记录都有效,即便多次消费也能保证幂等性,不至于同样ID的消息留的过久与下次签到时重复而不知道哪条是哪条
(因为每45分钟一节课,你要保证上节课没被消费的消息在下节课消费时是唯一的)
比如在0815
时上课,学生J在教室1101
的班牌打卡上课(第一节课)
那么在上课前的0813
时打卡,预处理程序往消息中间件推送了一条stu01J:timestamp1
来表示学生J在这个时间签到了
如果消息在0814
或者之前被正常消费了,那么很好,很正常的。
如果消息在下节课被消费呢?比如在0909
,也就是下节课上课之前被消费了呢?
这个时候学生J在1102教室
又打了一次卡(因为是第二节课了呀)
预处理程序往消息中间件推送了一条stu01J:timestamp2
即便这两条消息被同时或者不同时被消费,学生J的打卡记录都是正常的,因为预处理程序带上了一个timestamp,那么在后面的签到处理程序消费消息时记录的数据会以timestamp为主,这也是为什么需要预处理程序的原因
所有的业务都是不同的,大家要自己分析来进行业务实现哦
然后是签到消费程序-ATTEND_RECORDER
我们根据当前的业务环节理一下思路~
就是这样。我们依靠多次消费消息以及在写入数据前判断消息是否已存在来保证消息的可靠性和幂等性
这里不用太在意性能情况,因为是分布式的,性能不够再扩展一台就行了(歪理
具体代码
这里仅展示两部分,就是消息订阅到之后的监听以及Service,其他部分大家看下github上的代码就行了
RocketMsgListener
消息监听:
package xyz.xy718.attend_recorder.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import xyz.xy718.attend_recorder.service.AttendService;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
/**
* 签到消息监听
* @author: Xy718
* @date: 2020/8/4 17:42
* @description:
*/
@Slf4j
@Component
public class RocketMsgListener implements MessageListenerConcurrently{
@Value("${attend.topic}")
public String rocketTopic ;
@Resource
AttendService attendService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list
, ConsumeConcurrentlyContext consumeConcurrentlyContext
) {
//检测订阅列表是否为空
if (CollectionUtils.isEmpty(list)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//获取订阅得到的消息
MessageExt messageExt = list.get(0);
String msg=new String(messageExt.getBody());
log.info("订阅到了{}个签到人:{}",list.size(),msg);
//签到!
Optional<Boolean> attend=attendService.goAttend(msg.split(":")[0],msg.split(":")[1]);
if(attend.isPresent()){
if(attend.get()){
//存在,且通过
log.info("同学:{}签到成功",msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
int reConsume = messageExt.getReconsumeTimes();
// 消息已经重试了3次,如果不需要再次消费,则返回成功
if(reConsume >=3){
log.info("消息消费上限");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 消息消费并没有成功
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
AttendService
也就是签到的数据处理:
package xyz.xy718.attend_recorder.service;
import org.springframework.stereotype.Service;
import xyz.xy718.attend_recorder.dao.repository.AttendRecordRepository;
import xyz.xy718.attend_recorder.model.AttendRecord;
import javax.annotation.Resource;
import java.util.Optional;
/**
* @author: Xy718
* @date: 2020/8/7 14:55
* @description:
*/
@Service
public class AttendService {
@Resource
AttendRecordRepository arRepo;
public Optional<Boolean> goAttend(String code,String timeStamp){
int coded;
long timeStamped;
try {
//parsing
coded=Integer.parseInt(code);
timeStamped=Long.parseLong(timeStamp);
} catch (NumberFormatException e) {
e.printStackTrace();
return Optional.ofNullable(false);
}
//查找当前签到目标ID与签到时间是否已有登记
Optional<AttendRecord> isAlready=arRepo.findByTargetIdAndRecordTime(coded,timeStamped);
if(isAlready.isPresent()){
//如果有(重复)就返回签到成功
return Optional.ofNullable(true);
}
//如果没有签到,就登记签到
AttendRecord record=arRepo.save(new AttendRecord(coded,0,timeStamped));
if(record!=null){
return Optional.ofNullable(true);
}
return Optional.ofNullable(false);
}
}
代码也很简单,基本都是对业务的简单实现~
ATTEND_RECORDER的配置部分大家看github就行,就是一些jpa和rocketmq的配置
至此代码的部分就结束了~
(就这啊。。。。。
接下实际测试的启动顺序为:
Eureka注册中心->GatewayAPI网关->PRE_ATTEND预处理服务->ATTEND_RECORDER签到登记服务
首先切换到项目目录下,然后cd 到各个子项目(这里maven应该有在父项目打包各个子项目的功能,暂时不研究,节省时间)
cd EUREKA_SERVER
mvn clean package
cd ../GATEWAY
mvn clean package
cd ../PRE_ATTEND
mvn clean package
cd ../ATTEND_RECORDER
mvn clean package
这里的MODEL项目是不需要打包的,因为她是其他项目的依赖,我们需要在打包其他项目之前将父项目 mvn install
就可以了
这里有疑问的话私信我或者留言即可
然后依照注册顺序启动:
java -jar EUREKA_SERVER/target/eureka_server-0.0.1-SNAPSHOT.jar
java -jar GATEWAY/targetgateway-0.0.1-SNAPSHOT.jar
java -jar PRE_ATTEND/target/pre_attend-0.0.1-SNAPSHOT.jar
java -jar ATTEND_RECORDER/target/attend_recorder-0.0.1-SNAPSHOT.jar
启动即可,然后可以访问
http://localhost:11800 来查看服务注册情况
如果需要横向扩展服务,只需要在启动命令后面加上 --server.port=XXXX
改变端口号就行了
这里有疑问的话私信我或者留言也即可
都成功后就是测试并发啦~
这里因为我不是很专业,所以用的是Python3自己简单写了个脚本,同时启动两个程序,每个程序3500个请求,来模拟前端的签到请求
这是代码:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
import json
import threading
import time
i = 0
# 开启线程数目
tasks_number = 3500
accessCount=0;
class myThread (threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
this_str="start :"+self.name
## print (this_str)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36',
'Content-Type': 'application/json; charset=UTF-8',
}
url = 'http://localhost:12810/api/attend/code?code='+str(i)
try:
r = requests.post(url, headers)
result = json.loads(str(r.text))
# print(str(result['code']))
except Exception as e:
print(e)
this_str="end :"+self.name
print (this_str)
try:
print('启动')
time1 = time.clock()
while i < tasks_number:
t_name = i+1
t = myThread(str(t_name))
t.start()
i +=1
time2 = time.clock()
times = time2 - time1
print('耗时:'+ str(times)+'s')
except Exception as e:
print(e)
其中的http://localhost:12810/api/attend/code?code=xxx
就是前端的请求接口了,用Gateway做了转发,所以是Gateway的端口~
大家有更好的测试方式也可以自己试试哦
完
没写完整哈哈哈哈哈其实最后的测试结果部分应该放出来的,
但是由于电脑的问题,手头上没有多出来的服务器拿来做并发测试,所以搁置了,大家可以自己测试一下~
do yourself;be yourself;