MQ Source in Spark Structured Streaming(Spark结构化流媒体中的MQ源)
问题描述
我已经在Spark Structure Streaming中实现了MQ源代码。我正在使用IBM MQ Core库和Java 8
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq</artifactId>
<version>${mq.version}</version>
<scope>provided</scope>
问题是读取操作太慢。我正在运行我的流应用程序,有4个执行器,每个执行器4 GB内存,4个内核。但我的应用程序需要10分钟才能从队列中读取1000条消息。每条消息的大小约为3kb。
从现在起,我的Spark应用程序逐个读取消息并并行处理这些消息。
有没有一种方法可以让我们同时阅读这些消息。我听说过MQ中的并发消费者。但我找不到任何有关这方面的适当文件。
有人能用Java跟我分享一些例子吗?
由于我的组织限制,我无法共享任何代码。对此我深表歉意。但我可以提供任何需要的信息来提供一些帮助。
推荐答案
您是在与MQ队列管理器相同的服务器上运行应用程序,还是在远程服务器上运行应用程序?在循环获取下一条消息之前,您的应用程序是否正在做一些后端工作?您进行了哪些网络性能测试?可能您的网络速度很慢。 下面是一个简单的Java/MQ,它循环和检索队列上的所有消息。我只是将1000条3kb的消息放在队列中并运行它。以下是输出:但我的应用程序需要10分钟从队列中读取1000条消息。 每封邮件的大小约为3KB。
2018/06/28 17:18:21.191 MQTest12L: testReceive: successfully connected to MQWT1
2018/06/28 17:18:21.207 MQTest12L: testReceive: successfully opened TEST.Q1
2018/06/28 17:18:21.448 MQTest12L: testReceive: read 1000 messages
2018/06/28 17:18:21.448 MQTest12L: testReceive: closed: TEST.Q1
2018/06/28 17:18:21.450 MQTest12L: testReceive: disconnected from MQWT1
获取1000条3KB邮件花费了259毫秒。因此,如果您的程序需要10分钟才能收到1000条3kb的消息,则说明您的程序或您的网络有问题。
下面是一个功能齐全的Java/MQ应用程序,名为MQTest12L.java,它将循环并检索队列中的所有消息:
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
/**
* Program Name
* MQTest12L
*
* Description
* This java class will connect to a remote queue manager with the
* MQ setting stored in a HashTable, loop to retrieve all messages on a queue
* then close and disconnect.
*
* Sample Command Line Parameters
* -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
*
* @author Roger Lacroix
*/
public class MQTest12L
{
private static final SimpleDateFormat lOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
private Hashtable<String,String> params;
private Hashtable<String,Object> mqht;
private String qMgrName;
private String outputQName;
/**
* The constructor
*/
public MQTest12L()
{
super();
params = new Hashtable<String,String>();
mqht = new Hashtable<String,Object>();
}
/**
* Make sure the required parameters are present.
* @return true/false
*/
private boolean allParamsPresent()
{
boolean b = params.containsKey("-h") && params.containsKey("-p") &&
params.containsKey("-c") && params.containsKey("-m") &&
params.containsKey("-q") &&
params.containsKey("-u") && params.containsKey("-x");
if (b)
{
try
{
Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
b = false;
}
}
return b;
}
/**
* Extract the command-line parameters and initialize the MQ HashTable.
* @param args
* @throws IllegalArgumentException
*/
private void init(String[] args) throws IllegalArgumentException
{
int port = 1414;
if (args.length > 0 && (args.length % 2) == 0)
{
for (int i = 0; i < args.length; i += 2)
{
params.put(args[i], args[i + 1]);
}
}
else
{
throw new IllegalArgumentException();
}
if (allParamsPresent())
{
qMgrName = (String) params.get("-m");
outputQName = (String) params.get("-q");
try
{
port = Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
port = 1414;
}
mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
// I don't want to see MQ exceptions at the console.
MQException.log = null;
}
else
{
throw new IllegalArgumentException();
}
}
/**
* Connect, open queue, loop and get all messages then close queue and disconnect.
*
* @throws MQException
*/
private void testReceive()
{
MQQueueManager qMgr = null;
MQQueue queue = null;
int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF + CMQC.MQOO_INQUIRE + CMQC.MQOO_FAIL_IF_QUIESCING;
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
MQMessage receiveMsg = null;
int msgCount = 0;
boolean getMore = true;
try
{
qMgr = new MQQueueManager(qMgrName, mqht);
MQTest12L.logger("successfully connected to "+ qMgrName);
queue = qMgr.accessQueue(outputQName, openOptions);
MQTest12L.logger("successfully opened "+ outputQName);
while(getMore)
{
receiveMsg = new MQMessage();
try
{
// get the message on the queue
queue.get(receiveMsg, gmo);
msgCount++;
if (CMQC.MQFMT_STRING.equals(receiveMsg.format))
{
String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength());
// MQTest12L.logger("["+msgCount+"] " + msgStr);
}
else
{
byte[] b = new byte[receiveMsg.getMessageLength()];
receiveMsg.readFully(b);
// MQTest12L.logger("["+msgCount+"] " + new String(b));
}
}
catch (MQException e)
{
if ( (e.completionCode == CMQC.MQCC_FAILED) &&
(e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) )
{
// All messages read.
getMore = false;
break;
}
else
{
MQTest12L.logger("MQException: " + e.getLocalizedMessage());
MQTest12L.logger("CC=" + e.completionCode + " : RC=" + e.reasonCode);
getMore = false;
break;
}
}
catch (IOException e)
{
MQTest12L.logger("IOException:" +e.getLocalizedMessage());
}
}
}
catch (MQException e)
{
MQTest12L.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
finally
{
MQTest12L.logger("read " + msgCount + " messages");
try
{
if (queue != null)
{
queue.close();
MQTest12L.logger("closed: "+ outputQName);
}
}
catch (MQException e)
{
MQTest12L.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
try
{
if (qMgr != null)
{
qMgr.disconnect();
MQTest12L.logger("disconnected from "+ qMgrName);
}
}
catch (MQException e)
{
MQTest12L.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
}
}
/**
* A simple logger method
* @param data
*/
public static void logger(String data)
{
String className = Thread.currentThread().getStackTrace()[2].getClassName();
// Remove the package info.
if ( (className != null) && (className.lastIndexOf('.') != -1) )
className = className.substring(className.lastIndexOf('.')+1);
System.out.println(lOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
}
/**
* main line
* @param args
*/
public static void main(String[] args)
{
MQTest12L write = new MQTest12L();
try
{
write.init(args);
write.testReceive();
}
catch (IllegalArgumentException e)
{
System.err.println("Usage: java MQTest12L -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
System.exit(1);
}
System.exit(0);
}
}
这篇关于Spark结构化流媒体中的MQ源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Spark结构化流媒体中的MQ源
基础教程推荐
- 在螺旋中写一个字符串 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01