首页 > 自考资讯 > 自考知识

mq的理解,mqmd

头条共创 2024-07-05

另外,为了避免消息队列服务器宕机导致消息丢失,成功发送到消息队列的消息会存储在消息生产者服务器上,只有在消息被消费者服务器实际处理后才会被删除。如果消息队列服务器出现故障,生产者服务器会选择分布式消息队列服务器集群中的其他服务器来发布消息。

除了发布订阅模式之外,消息队列还有其他发送模式。

点对点模型

基本模型只有一个发送者、一个接收者和一个分布式队列。

生产者-消费者模型

如果发送者和接收者都可以有多个部署实例,甚至可以是不同类型的,并且可以共享相同的队列,那么这将成为标准的生产者/消费者模型。在该模型中,这三个角色通常称为生产者、队列和消费者。

简介: 消息队列提高了系统的并发性和可扩展性。

2、使用消息队列时遇到哪些问题?

可用性降低:加入MQ之前,无需担心MQ服务器挂掉。部署MQ后,必须考虑可用性会降低的情况。复杂度增加:参与MQ后,必须保证消息不被重复消费,处理消息丢失,保证消息传递的顺序。因此,考虑的因素更多,系统也变得更复杂。数据一致性:消息队列提供的异步性确实提高了系统的响应能力,但是当消息的实际使用者没有正确使用它们时会发生什么?这可能会导致数据不一致。526240affc8548808608a0725c17f1f0~noop.image?_iz=58558&from=article.pc_detail&lk3s=953192f4&x-expires=1720760476&x-signature=fCprDscwno8a0ofx74cEiZmr0BM%3D 42a9e5564c4b4c32b355e334e4c05233~noop.image?_iz=58558&from=article.pc_detail&lk3s=953192f4&x-expires=1720760476&x-signature=%2B%2FHJPYk%2FgRWUDH%2Bkn%2FK41hDoI%2F8%3D

2.1 解决办法是什么?

由于可用性问题引入消息队列后,系统可用性下降。在实际项目中,会发送MQ消息。如果不使用集群,如果一台MQ机器出现故障宕机,那么当其中一台MQ出现故障时,MQ消息就无法发送,系统就会崩溃。其余MQ 计算机可以在生产环境中继续运行,无需独立消息队列。如果存在,你应该将其用于实际目的(技术有点复杂,所以你也可以作弊并额外收费)。解决这个问题需要深入了解不同的聚类技术。下面是一个ActiveMq 集群示例(Zookeeper+ActiveMq)。先看图。

197b6a6c4bc747059beaecff97ff6eac~noop.image?_iz=58558&from=article.pc_detail&lk3s=953192f4&x-expires=1720760476&x-signature=RMZ0HstIsgCpBeb8wkfiVdaaVGY%3D

eeper 删除临时节点。当服务器向Zookeeper 注册时,Zookeeper 会分配一个序列号。序列号较小的被视为“主”,序列号较大的被视为“备用”。

当客户端(通常是Web服务器)需要访问某个服务时,它会连接到Zookeeper,获取指定目录下的临时节点列表(注册的服务器信息),并使用必须获取的“主”服务器的地址。为后续访问操作提供较低的序列号。达到“永远访问主服务器”的目的。当“主”服务器发生故障时,ZooKeeper 会从指定目录中删除相应的临时节点。同时,您可以通知所有关心这一变化的客户,并高效、快速地传播此信息。当下一个请求到来时,它也会连接到zookeeper,但此时它实际上会访问备份MQ。

我们不会在这里解释如何配置集群。如果您在线搜索教程,您会发现很多。

关于复杂性问题1.如何防止消息被重复消费?

要回答这个问题,我们首先要知道为什么消息会被重复消费。大多数时候,这是因为确认信息没有发送到消息队列,消息队列不知道它已经消费和分发了消息。再次将消息发送给其他消费者。因此,解决问题的方法有3种。

当消息插入数据库并为该消息指定唯一的主键时,即使重复消费,也不会发生主键冲突,避免数据库出现脏数据。如果收到此消息并执行redis set操作,则无论设置多少次结果都相同,set操作被认为是幂等操作,无需解析。如果以上两种情况还不行,请准备第三方服务方记录消费情况。以redis为例,一条消息被分配一个全局ID,只要该消息被消费,该ID和消息就会以K-V格式写入redis。消费者开始消费之前,可以先检查Redis中是否有消费记录。 2、如何保证消息的可靠传输?

这个问题其实是第一个问题的延伸。这意味着您需要确保可靠的传输。这其实是为了防止生产者数据丢失、消息队列数据丢失、消费者数据丢失。

事实上,中间件开发者已经考虑到了这些问题,甚至提供了一些可配置文件,以便你可以自己设置相关参数。消息队列通常存储在磁盘上,因此您不必担心丢失生产者数据。MQ事务会被回滚,如果消费者丢失,消费信息将简单地改为手动确认,因为它通常使用自动确认消息模式。消费者消费完毕后调用MQ。只需检查方法即可。

3、如何保证从消息队列中取出的数据按顺序执行?

该算法将需要保持顺序的消息放入同一个消息队列中,并且仅使用一个消费者来消费该队列。

Rabbitmq:拆分多个队列,每个队列有一个消费者,只是多了一些队列,这绝对是一种痛苦。或者,只有一个队列,为一个消费者提供服务,并且该消费者在内部使用内存队列进行排队。然后分发到不同的Kafka底层工作进程(主题、分区、消费者、内部单线程消耗),写入N个内存队列,N个线程各消耗一个内存队列。 4. 如何解决延迟和过期问题。消息队列中有数百万条消息已经积压了几个小时。我该如何解决它们?

该问题是在生产环境中发生事故后出现的。消息队列延迟和过期是防止消息队列过度拥挤的自我保护机制。当然,保护也是可以关闭的,比如某条消息如果没有被消费5次,就会被丢弃,保护机制也不会关闭。那么问题来了,那些被丢弃的消息是不是就不需要了呢?事实上,事实并非如此。可以查询这个业务中一批丢失的数据,写一个临时程序一点点找到,补充到mq中,把丢失的数据补回来。

5. 数据是通过推送还是拉取的方式发送给消费者?各自的缺点是什么?

Push模型具有优秀的实时性,但由于状态维护等问题,Consumer的状态需要在Broker端维护,因此不适合实现消息中间件。在Broker 支持大量消费者的场景下,消费者的消费速率不一致,导致Broker 在推送时难以处理不同消费者的情况。该消息无法被消费者消费,因为不知道消费者的停机是短期的还是永久的。此外,推送消息(可能很大)也会增加消费者的负载。如果只有一个Consumer,使用push比pull更好。虽然pull模式比较容易实现,但实时性取决于轮训的频率,因此不适合在实时性要求较高的场景中使用。 3.如何使用MQ(以ActiveQM为例)

附官网:http://activemq.apache.org/

附启动服务访问地址:http://127.0.0.1:8161/admin/用户名/密码admin/admin

发布/订阅模型

制片人出版

public class JMSProducer { private static Final String USERNAME=ActiveMQConnection.DEFAULT_USER; //默认连接用户名private static Final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; //默认连接密码private static Final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; 10; //发送的消息数量public static void main(String[] args) { ConnectionFactory connectionFactory //接受连接会话; //消息的目的地MessageProducer messageProducer; factory connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); //通过连接工厂获取连接connection.start(); //Connection 启动一个session=connection.createSession (Boolean.TRUE, Session.AUTO_ACKNOWLEDGE) //创建一个会话//destination=session.createQueue( 'FirstQueue1'); //创建一个消息队列destination=session. createTopic('FirstTopic1'); sendMessage(session, messageProducer);发送消息session.commit () } catch (Exception e) { //TODO 自动生成的catch 块e .printStackTrace() }finally{ if(connection!=null){ try { connection .close(); catch (JMSException e) { //TODO 自动生成的catch 块e.printStackTrace(); public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{ for(int i=0;iJMSProducer.SENDNUM;i++){ TextMessage message=session.createTextMessage('ActiveMQ 发送的消息'+i); ('消息发送:'+'ActiveMQ发出的消息'+i);

消费者订阅

/** * 消息监听-订阅者一* @author Administrator * */public class Listenerimplements MessageListener{ @Override public void onMessage(Message message) { //TODO 自动生成的方法存根try { System.out.println('订阅者消息接收者: '+((TextMessage)message).getText()); } catch (JMSException e) { //TODO 自动生成的catch 块e.printStackTrace() }}

public class JMSConsumer { private static Final String USERNAME=ActiveMQConnection.DEFAULT_USER; //默认连接用户名private static Final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; //默认连接密码private static Final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; String[] args) { ConnectionFactory connectionFactory; //连接工厂Connection connection=null //会话从Destination 接收消息或向Destination 发送消息的线程MessageConsumer messageConsumer; //实例化连接工厂connectionFactory=new ActiveMQConnectionFactory(JMSConsumer) .USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); //通过连接获取连接。 Factory connection.start(); //启动连接session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE) //创建会话//destination=session.createQueue('FirstQueue1'); ('FirstTopic1'); messageConsumer=session.createConsumer(destination); //创建消息消费者messageConsumer.setMessageListener(new Listener()); //注册消息监听。 e) { //TODO 自动生成的catch 块e.printStackTrace(); } }} 一个好的分布式消息队列,高吞吐量、低延迟(根据场景而定)、透明发送、强扩展性我认为是必须的。有。具有容灾能力、一致的顺序交付、同步异步传输方式、完善的运维和监控工具,并且开源。

版权声明:本文转载于今日头条,版权归作者所有。如有侵权,请联系本站编辑删除。

猜你喜欢