X
首页 数据库相关软件开发 调用和使用MQ 函数
调用和使用MQ 函数

调用 MQ 函数的代码

使用 IDS WebSphere MQ 的功能,向 WebSphere MQ 队列发送消息或从中接收消息都很简单,如清单 1 所示:


清单 1. 在 SQL 和存储过程中使用 Informix MQ 函数
select MQSend("CreditService", customerid || ":" || address ||  ":" || product ":" || orderid) 
|-------10--------20--------30--------40--------50--------60--------70--------80--------9|
|-------- XML error:  The previous line is longer than the max of 90 characters ---------|
from   order_tab
where customerid = 1234;
insert into shipping_tab(shipping_msg) values(MQReceive());
create function get_my_order() returns int;
define  cust_msg lvarchar(2048);
define  customerid  char(12);
define  address     char(64);
define  product     char(12);
define  corderid    char(12);
define  snd_status  int;
-- Get the order from Order entry application.
execute function  MQReceive("OrderQueue")  into cust_msg;
let customerid = substr(cust_msg, 1, 12);
let  address    = substr(cust_msg, 14, 77);
let  product    = substr(cust_msg, 79, 90);
let  corderid    = substr(cust_msg, 92, 103);
insert into shipping_table(custid, addr, productid, orderid)
                Values(customerid, address, product, corderid);
-- send the status to CRM application
execute function MQSend("CRMQueue", corderid || ":IN-SHIPPING") into snd_status;
return 1;
end function;

调用 MQ 函数的代码

当回滚清单 2 中的事务时,接收到的消息被恢复到队列 bookorder 中,这一行将从 shipping_tab 中删除。


清单 2. 包含 MQ 函数的多语句事务
begin work;
insert into shipping_tab(shipping_msg) values (MQReceive("bookorderservice"));
rollback work;  -- Undo previous statement including WMQ operation.

在 IDS 应用程序中使用 MQ 函数

IDS 提供了用于暴露 WebSphere MQ 提供的每个接口 —— 读取、接收、发送、发布、订阅和取消订阅 —— 的函数,以及用于发送和接收大型消息的函数。WebSphere MQ 函数可以在任何可使用函数的地方进行调用:值子句、投影列表、查询过滤器、存储过程和触发器。此外,借助 IDS,可以将一个 WebSphere MQ 队列映射为一个 IDS 表。这个表上的插入操作将被翻译为到 WebSphere MQ 的发送操作,选择操作则被翻译为读取或接收操作。

基础

WebSphere MQ 提供队列及其操作的简单抽象,每种操作带有一些选项,例如消息到期时间和重试计数。IDS 将这些选项抽象为服务和策略。

服务:将队列、队列管理器和消息的代码集映射为服务。表 "informix".mqiservice 存储这种映射。IDS.DEFAULT.SERVICE 被映射为系统缺省队列管理器、名为 IDS.DEFAULT.QUEUE 的队列和缺省代码集。

策略:策略定义每种操作的诸如优先级、到期日期之类的属性。表 "informix".mqipolicy 存储每种策略的 37 个属性。IDS.DEFAULT.POLICY 是缺省策略。取决于您的应用程序环境,需要创建一个或多个策略。

相关 ID:当多个应用程序共享相同的队列时,可以使用一个最长为 48 字节的相关 ID 来控制交互。一旦应用程序就它们的消息的相关 ID 达成一致,它们就可以获得与相关 ID 匹配的消息。其工作方式类似于过滤器或 SQL 查询中的谓词。相关 ID 不是强制要求的,也没有缺省值。


表 1. IDS 中的 MQ 函数
函数名描述
MQSend()将一条字符串消息发送到一个队列
MQSendClob()将 CLOB 数据发送到一个队列
MQRead()将队列中的一条字符串消息读取到 IDS 中,但是不从队列中将其删除
MQReadClob()将队列中的一个 CLOB 读取到 IDS 中,但是不从队列中将其删除
MQReceive()将队列中的一条字符串消息接收到 IDS 中,并将其从队列中删除
MQReceiveClob()将队列中的一个 CLOB 接收到 IDS 中,并将其从队列中删除
MQSubscribe()订阅一个主题
MQUnSubscribe()取消订阅之前订阅的主题
MQPublish()()发布一条消息到一个主题中
MQPublishClob()发布一个 CLOB 到一个主题中
CreateMQVTIRead()创建一个 read Virtual Table Interface (VTI) 表,并将它映射到一个队列
CreateMQVTIReceive()创建一个 receive VTI 表,并将它映射到一个队列
MQTrace()跟踪 MQ 函数的执行
MQVersion()获得 MQ 函数的版本

用于从 IDS 发送到 WebSphere MQ 的函数


清单 3. MQSend 和 MQSendClob 函数
MQSend(Service, Service_Policy, Message, CorrelationID);
MQSendClob(Service, Service_Policy, ClobMessage, CorrelationID);

可以将最长为 32739 字节的消息发送到 WebSphere MQ 队列。要发送更大的消息,可以使用 CLOB 数据类型和 MQSendClob() 函数。MQSendClob() 的行为与 MQSend() 一致,惟一的区别是它以 CLOB 作为消息参数,而不是以字符类型作为参数。Message 和 ClobMessage 是必需的参数。IDS 使用保存在 "informix".mqiservice 表中的 Service 记录项中的策略将消息发送到队列管理器所管理的队列中。

发送函数的参数解释

四个参数都被指定。当给出了 4 个参数时,翻译起来就很简单。MQSend(serviceparam, policyparam, messageparam, correlationparam) 照样执行即可。

下面是缺少一个或多个参数的情况下的翻译:

  • MQsend(messageparam) 被翻译为:MQSend("IDS.DEFAULT.SERVICE", "IDS.DEFAULT_POLICY", messageparam, NULL);
  • MQsend(messageparam) 被翻译为:MQSend("IDS.DEFAULT.SERVICE", "IDS.DEFAULT_POLICY", messageparam, NULL);
  • MQsend(serviceparam, policyparam, messageparam) 被翻译为:MQSend(serviceparam, policyparam, messageparam, NULL);

清单 4. 发送函数示例
select MQSend("myservice", "mypolicy", orderid || ":" || address)
FROM    tab
Where   orderid = 12345;

所有 WebSphere MQ 函数都应该在一个事务中运行。在 IDS 中,SQL 语句 SELECT、UPDATE、DELETE 和 INSERT 自动开始一个新事务。或者,可以用 BEGIN WORK 语句开始一个新事务。

如果直接执行函数,则会产生一个错误。例如:

execute function MQSend("MyService", "<order><id>5</id><custid>6789</custid></order>");

IDS 不会隐式地为 EXECUTE 语句开始一个新事务。因此,必须显式地开始一个事务:


清单 5. 显式事务
begin work;
execute function MQSend("MyService", "<order><id>5</id><custid>6789</custid></order>");
commit work;

如果事务被回滚,那么 WebSphere MQ 上的所有操作也随之回滚,就像 IDS 回滚它的更改一样。


清单 6. 带回滚的事务
begin work;
insert into resultstab(sendval) value(MQSend("MyService", "<order><id><5</id><custid>6789
      </custid></order>"));
rollback work;



用于从 WebSphere MQ 读取和接收消息的 IDS 函数


清单 7. 读取和接收函数
MQRead(Service, Policy, CorrelationID) returns lvarchar;
MQReadClob(Service, Policy, CorrelationID) returns CLOB;
MQReceive(Service, Policy, CorrelationID) returns lvarchar;
MQReceiveClob(Service, Policy, CorrelationID) returns CLOB;

读取操作从队列获取消息,但是不从队列中删除消息。接收操作从队列中删除消息,并获取该消息。这些函数调用时可以带 0 个或多个参数。参数的解释类似于前面的 MQSend()。接收函数的事务行为类似于 MQSend。

MQRead() 和 MQReceive() 可以返回至多 32739 个字节。消息本身的最大大小是一个 WebSphere MQ 配置参数。更大的消息应该以 CLOB 的形式读取或接收。对于 MQ,一条消息就是一条消息。根据长度的不同,IDS 对消息加以区分,将消息映射到不同数据类型。

如果给定了一个相关 ID,那么 WebSphere MQ 获取队列中具有匹配相关 ID 的下一条消息。否则,返回 NULL 消息。当队列上没有适用的消息时,由策略决定等待时间。因此,通过使用预定义的相关 ID,多个应用程序可以为不同的目的共享相同的队列。


清单 8. SQL 中的读取和接收函数
select mqread("SHIPING.SERVICE","My.DEFAULT.POLICY") from systables where tabid = 1;
select mqreceive("SHIPING.SERVICE","My.DEFAULT.POLICY") from systables where tabid = 1;



发布和订阅函数


清单 9. 发布和订阅函数
MQPublish(publisher_name, policyparam, message, topic, correlationid);
MQPublishClob(publisher_name, policyparam, clob_param, topic, correlationid);
MQSubscribe(subscriber_name, policy_name, topic);
MQUnsubscribe(subscriber_name, policy_name, topic);

对一个队列的发布和订阅,对于在多个应用程序之间基于不同主题交换信息来说是一种有效的配置。当订单输入应用程序需要与信用卡应用程序、发货应用程序、CRM 应用程序和合作伙伴应用程序交互时,订单输入应用程序将订单发布到一个队列上,并且只需要发布这一次。然后,目标应用程序可以订阅这个队列,并使用读取或接收函数获得消息。在这种机制中,WebSphere MQ 还支持将消息归入到一些主题中,以便于进行更细致的控制。例如,订单输入消息可以将订单分为书籍、电子和服饰等主题。

为发布和定义主题,必须对队列进行配置。WebSphere MQ 允许静态或动态地定义主题。除了队列管理器外,提供发布和订阅特性的 Message Broker 也必须运行。Message Broker 组件提供消息路由和消息转换,使业务集成变得更容易。

订阅一个主题,并指定要从中接收消息的队列。当发布者将一条属于那个主题的消息插入到该队列时,WebSphere MQ 中介将消息路由到每个指定订阅者的队列。订阅者通过读取或接收函数从队列获取消息。

"informix".mqipubsub 表:在使用发布和订阅服务之前,必须建立这个表。请参阅 The Informix dynamic server documentation, its schema, and examples。

发布者的名称和订阅者的名称必须在 "informix".mqipubsub 表中定义。其他参数在前面已经讨论过。


清单 10. SQL 中的发布和订阅函数
select MQSubscribe(‘WeatherChannel’,"Weather") from systables where tabid = 1;
select mqPublish("WeatherChannel",
"<weather><zip>94501</zip><date>7/27/2006</date>
<high>89</high><low>59</low></weather>","Weather")
from systables where tabid = 1;
select mqreceive("WeatherChannel","Weather")
from systables where tabid = 1;



实用函数

MQVersion() 返回 IDS 中的 WebSphere MQ blade 的当前版本。通过 MQTrace(trace_level, trace_file) 可以跟踪 WebSphere MQ 函数的执行路径和 IDS 与 MQ 之间的交互。跟踪级别介于 10 到 50 之间 —— 是 10 的倍数。


清单 11. 跟踪输出示例
14:19:38 Trace ON level : 50
14:19:47  >>ENTER : mqSend<<
14:19:47    status:corrid is null
14:19:47  >>ENTER : MqOpen<<
14:19:47   status:MqOpen @ build_get_mq_cache()
14:19:47  >>ENTER : build_get_mq_cache<<
14:19:47   status:build_get_mq_cache @ mi_get_database_info()
14:19:47   status:build_get_mq_cache @ build_mq_service_cache()
14:19:47  >>ENTER : build_mq_service_cache<<
14:19:47  <<EXIT : build_mq_service_cache>>



MQ 表映射函数

在 IDS 中调用 WebSphere MQ 函数比较容易,但是并不是使用 MQ 的最容易的方式。IDS 可以将一个 WebSphere MQ 队列映射到一个 IDS 表。在表上执行一条 SELECT 语句就可以取得队列中的消息,而执行表上的 INSERT 语句就可以发送消息。用法如下所示。其他操作,例如 UPDATE 和 DELETE,是不允许在这种表上执行的。


清单 12. 表到队列映射函数
MQCreateVtiRead(readtable, servicename, policy, maxMessage)  
MQCreateVtiReceive(receivetable, servicename, policy, maxMessage)

read 表上的 SELECT 操作的行为与 MQRead() 类似。它取得消息,但是不会将消息从队列中删除。而 receive 表上的 SELECT 操作在取走消息的同时,还从队列中删除消息。maxMessage 参数决定列的长度,同时还决定列的类型。当长度为正数时,创建一个长度为 maxMessage 的 lvarchar 列。可以定义的消息的最大长度为 32607。如果使用 -1 作为 maxMessage,则可以检索 CLOB 类型的消息,如果使用 -2 作为 maxMessage,则可以检索 BLOB 类型的消息。


清单 13. MQ 表到队列映射函数
-- Create a READ table with max message length 4096.
execute function MQCreateVTIREAD("myreadtable",  "myservice", "mypolicy", 4096);
-- Below is the table created by MQCreateVTIREAD() function. 
create table myreadtab 
 ( msg      lvarchar(4096),
    correlid varchar(24),
   topic    varchar(40),
   qname    varchar(48),
   msgid    varchar(12),
   msgformat varchar(8))
using "informix".mq (SERVICE = "myservice", POLICY = "mypolicy", ACCESS = "READ");
-- Get the top 10 messages from the queue.
SELECT first 10 * from myreadtable;
-- INSERT a message into the table
INSERT into myreadtable values("IBM:81.98;Volume:1020");
-- SELECT the first message matching correlation id
SELECT FIRST 1 * from myreadtable where correlid = 'abc123';
IDS is aware of correlation id predicate and sends the correlation id request to MQ.  
WMQ matches to correlation ID and sends the matched message.

-- create a table to transport BLOB data.
execute function MQCreateVTIRECEIVE("mydoctable",  "myservice", "mypolicy", -2);

-- Below is the table created by MQCreateVTIRECEIVE() function.
create table mydoctable
 ( msg      BLOB,
    correlid varchar(24),
   topic    varchar(40),
   qname    varchar(48),
   msgid    varchar(12),
   msgformat varchar(8))
using "informix".mq (SERVICE = "myservice", POLICY = "mypolicy", ACCESS = "RECEIVE");
execute function MQCreateVTIREAD("myreadtable",  "myservice", "mypolicy", 4096);

execute function MQCreateVTIREAD("myreadtable", "myservice", "mypolicy", 4096);
INSERT into mydoctable(msg) select blobcol from ordertab;
-- insert using blob, get through blob
insert into mydoctable(msg) values(filetoblob("/etc/passwd", "client"));
select lotofile(msg, '/tmp/blob.dat','client') from mydoctable;