1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
@Override
@Nullable
public Message receive(String destinationName) throws JmsException {
return receiveSelected(destinationName, null);
}
@Override
@Nullable
public Message receiveSelected(final String destinationName, @Nullable final String messageSelector) throws JmsException {
return execute(session -> {
Destination destination = resolveDestinationName(session, destinationName);
return doReceive(session, destination, messageSelector);
}, true);
}
@Nullable
protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector)
throws JMSException {
return doReceive(session, createConsumer(session, destination, messageSelector));
}
@Nullable
protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
try {
// Use transaction timeout (if available).
long timeout = getReceiveTimeout();
// 链接工厂
ConnectionFactory connectionFactory = getConnectionFactory();
// JMS 资源信息
JmsResourceHolder resourceHolder = null;
if (connectionFactory != null) {
// 从连接对象中获取JMS 资源信息
resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory);
}
if (resourceHolder != null && resourceHolder.hasTimeout()) {
// 超时时间
timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
}
// 具体的消息
Message message = receiveFromConsumer(consumer, timeout);
if (session.getTransacted()) {
// 事务性操作
// Commit necessary - but avoid commit call within a JTA transaction.
if (isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
JmsUtils.commitIfNecessary(session);
}
} else if (isClientAcknowledge(session)) {
// Manually acknowledge message, if any.
if (message != null) {
message.acknowledge();
}
}
return message;
} finally {
JmsUtils.closeMessageConsumer(consumer);
}
}
|