本文共 6342 字,大约阅读时间需要 21 分钟。
使用注解,可以更便捷的使用基于 Executor
框架的线程池。Spring Boot 框架中提供了一个 AsyncConfigurer
接口以及一个默认的 AsyncConfigurerSupport
实现。通过实现该接口,或继承该实现,可以自定义自己的线程池和异常处理操作。
@Configuration@EnableAsyncpublic class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(2000)); return executor; }}
使用 @EnableAsync
注解启动线程池,之后就可以使用 @Async
注解来标识需要在单独线程中执行的任务了。
@Servicepublic class AsyncServiceImpl implements AsyncService { @Override @Async public void doSomething() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("doSomething " + Thread.currentThread().getId() + " " + Thread.currentThread().getName()); }}
多系统之间的消息传递,可以通过读写公共文件、读写同一个数据库,但这些方式延迟高、锁操作频繁,并不适合高并发、高响应的业务场景。所以,基于生产者/消费者模式,Java 引入了消息服务(JMS,Java Message Service)。
消息服务的两种模式
常见消息中间件
ActiveMQ,由 Apache 软件基金会研发的纯 Java 程序,是一个操作系统支持 Java 虚拟机就可以运行的消息组件。现在有 Classic
和 Artemis
两个版本的,可以去。
要使用 ActiveMQ 中间件,需要添加如下依赖:
org.springframework.boot spring-boot-starter-activemq org.messaginghub pooled-jms
而后,配置中间件的地址、账户及密码等信息,便可以使用自动装配的 JmsTemplate
实例进行消息的发送,并用 @JmsListener
注解监听队列中接收到的消息。
spring.activemq.broker-url=tcp://localhost:61616spring.activemq.user=adminspring.activemq.password=adminspring.jms.pub-sub-domain=truespring.jms.template.default-destination=activemq.default.destinationspring.activemq.pool.enabled=truespring.activemq.pool.max-connections=5spring.activemq.packages.trusted=com.test.activemq.domain
在下载安装并启动创建的中间件时,注意本地硬盘空间使用率超过默认的 90% 时,便会阻塞消息的传递。
Kafka,是一种高吞吐量的分布式发布订阅消息系统,使用 Scala 和 Java 编写,可以在所需版本。
RabbitMQ,同其他消息中间件一样,该组件也支持 AMQP(Advance Message Queuing Protocol)消息协议,去后,在工程中引入下面的依赖便可以使用该中间件。
org.springframework.boot spring-boot-starter-amqp
之后,便可以对 RabbitMQ 进行相关的配置,使用 RabbitTemplate
实例进行消息的发送,使用 @RabbitListener
注解来监听队列中的消息,进行后续处理。另外实现 ConfirmCallback
接口来提供消息推送成功确认回调。
spring.rabbitmq.addresses=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.publisher-confirm-type=CORRELATED
RocketMQ,由阿里巴巴自研后捐赠给 Apache 基金会的分布式消息中间件,可以去所需版本。
在 Spring Boot 框架中开启定时任务十分简单,直接使用 @EnableScheduling
注解开启定时任务执行功能。如此,容器中的所有 Bean 都会被扫描,@Scheduled
注解标注的方法会被识别为定时任务。
WebSocket 是基于 TCP 协议的全双工通信协议,使得服务器可以主动向浏览器客户端发送消息。
在 Spring Boot 中,添加下面的依赖:
org.springframework.boot spring-boot-starter-websocket
然后使用 @ServerEndpoint
注解注册一个服务端点,该注解同 @RequestMapping
类似,会声明该端点的相对路径。添加端点注解后,还需要创建一个 ServerEndpointExporter
实例用来检测并注册他们。
@Service@ServerEndpoint("/websocket")public class WebSocketServiceImpl { private static CopyOnWriteArraySetwebsocketSet = new CopyOnWriteArraySet<>(); private Session session; //实际工程中应当将该方法单独放在一个配置类中 @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } @OnOpen public void open(Session session) { System.out.println("open"); this.session = session; websocketSet.add(this); } @OnMessage public void onMessage(String message, Session session) { try { for (Iterator iterator = websocketSet.iterator(); iterator.hasNext();) { WebSocketServiceImpl webSocketServiceImpl = iterator.next(); webSocketServiceImpl.sendMsg(message); } } catch (Exception e) { System.err.println(e); } } @OnClose public void close() { System.out.println("close"); websocketSet.remove(this); } @OnError public void onError(Session session, Throwable error) { error.printStackTrace(); } private void sendMsg(String message) throws IOException { this.session.getBasicRemote().sendText(message); }}
之后便可以在支持该协议的浏览器网页建立与服务器的连接,并进行双方的通信。连接相关端点的地址同普通接口访问类似,只是协议名不是 http
或 https
而是 ws
。
var websocket = nullif ('WebSocket' in window) { websocket = new WebSocket("ws://localhost:8090/my-spring-boot/websocket")} else { alert("not support websocket")}websocket.onerror = function () { alert("error")}websocket.onclose = function () { alert("close")}websocket.onopen = function () { alert("open")}websocket.onmessage = function (event) { alert(event.data)}window.onbeforeunload = function () { websocket.close()}function closeWebsocket() { websocket.close()}function sendMessage() { let message = $('#message').val(); websocket.send(message)}
并不是所有的浏览器都支持 WebSocket 协议,所以为了兼容它们,可以使用子协议 STOMP(Simple or Streaming Text Orientated Messaging Protocol)。
首先需要使用 @EnableWebSocketMessageBroker
注解启用 STOMP 协议,Spring Boot 会加载自动配置 WebSocketMessagingAutoConfiguration
,当然,也可以自己实现 WebSocketMessageBrokerConfigurer
接口来注册需要的端点。
@Configuration@EnableWebSocketMessageBrokerpublic class StompConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/stomp-message").withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/subscribe-message"); registry.setApplicationDestinationPrefixes("/stomp-request"); }}
在自定义配置中注册两个服务端点,并设置订阅和请求路径的前缀。其中 withSockJS()
方法表示支持第三方 SockJS
框架,在使用时应引入下面的库:
而后,便可以创建连接,发送消息或订阅消息:
/** /my-spring-boot 是服务的上下文路径,/stomp-message 是定义的端点路径 */let socket = new SockJS('/my-spring-boot/stomp-message')var stompClient = Stomp.over(socket)stompClient.connect({}, function () { /** /subscribe-message 是订阅消息时的前缀,/plain 是订阅消息的相对路径 */ stompClient.subscribe('/subscribe-message/plain', function (data) { alert(data) })})/** /stomp-request 是进行消息操作时的前缀,/send-message 是相对路径 */stompClient.send("/stomp-request/send-message",{},message)
/stomp-request/send-message
路径便会请求到服务中的标注为 @MessageMapping("/send-message")
的方法。该方法可以使用 @SendTo
标注消息转发的路径,还可以在方法中将消息转发给其他消息中间件。
@Controllerpublic class StompController { @Resource private RabbitMqService rabbitMqService; @MessageMapping("/send-message") @SendTo("/subscribe-message/plain") public String sendMsg(String message) { System.out.println(message); rabbitMqService.sendMsg(message); return message; } }
转载地址:http://itdws.baihongyu.com/