From 43404089ad2fc1b6a21abd061e3d682394b8de9e Mon Sep 17 00:00:00 2001 From: HTHou Date: Fri, 12 Jun 2026 10:02:36 +0800 Subject: [PATCH 1/2] Expose client remote address in connect intercept --- .../io/moquette/broker/MQTTConnection.java | 2 +- .../java/io/moquette/broker/PostOffice.java | 5 ++- .../interception/BrokerInterceptor.java | 8 +++- .../io/moquette/interception/Interceptor.java | 6 +++ .../messages/InterceptConnectMessage.java | 21 ++++++++++ .../interception/BrokerInterceptorTest.java | 41 +++++++++++++++++++ 6 files changed, 79 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index a7930c860..e7cf18135 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -396,7 +396,7 @@ public void operationComplete(ChannelFuture future) throws Exception { setupInflightResender(channel); } - postOffice.dispatchConnection(msg); + postOffice.dispatchConnection(msg, remoteAddress()); LOG.trace("dispatch connection: {}", msg); } } else { diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index 3c2771036..b53cbc7e4 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -42,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.StandardCharsets; @@ -1124,8 +1125,8 @@ private static boolean validateContentTypeAsUTF8(MqttPublishMessage msg) { * notify MqttConnectMessage after connection established (already pass login). * @param msg */ - void dispatchConnection(MqttConnectMessage msg) { - interceptor.notifyClientConnected(msg); + void dispatchConnection(MqttConnectMessage msg, InetSocketAddress remoteAddress) { + interceptor.notifyClientConnected(msg, remoteAddress); } void dispatchDisconnection(String clientId,String userName) { diff --git a/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java b/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java index 89c0cefea..adf0e205e 100644 --- a/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java +++ b/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java @@ -28,6 +28,7 @@ import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -111,10 +112,15 @@ public void stop() { @Override public void notifyClientConnected(final MqttConnectMessage msg) { + notifyClientConnected(msg, null); + } + + @Override + public void notifyClientConnected(final MqttConnectMessage msg, final InetSocketAddress remoteAddress) { for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) { LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}", msg.payload().clientIdentifier(), handler.getID()); - executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg))); + executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg, remoteAddress))); } } diff --git a/broker/src/main/java/io/moquette/interception/Interceptor.java b/broker/src/main/java/io/moquette/interception/Interceptor.java index 7055bceae..6e49c90f9 100644 --- a/broker/src/main/java/io/moquette/interception/Interceptor.java +++ b/broker/src/main/java/io/moquette/interception/Interceptor.java @@ -22,6 +22,8 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import java.net.InetSocketAddress; + /** * This interface is to be used internally by the broker components. *

@@ -36,6 +38,10 @@ public interface Interceptor { void notifyClientConnected(MqttConnectMessage msg); + default void notifyClientConnected(MqttConnectMessage msg, InetSocketAddress remoteAddress) { + notifyClientConnected(msg); + } + void notifyClientDisconnected(String clientID, String username); void notifyClientConnectionLost(String clientID, String username); diff --git a/broker/src/main/java/io/moquette/interception/messages/InterceptConnectMessage.java b/broker/src/main/java/io/moquette/interception/messages/InterceptConnectMessage.java index e1cde5267..699aec61a 100644 --- a/broker/src/main/java/io/moquette/interception/messages/InterceptConnectMessage.java +++ b/broker/src/main/java/io/moquette/interception/messages/InterceptConnectMessage.java @@ -18,19 +18,40 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; +import java.net.InetSocketAddress; +import java.util.Optional; + public class InterceptConnectMessage extends InterceptAbstractMessage { private final MqttConnectMessage msg; + private final InetSocketAddress remoteAddress; public InterceptConnectMessage(MqttConnectMessage msg) { + this(msg, null); + } + + public InterceptConnectMessage(MqttConnectMessage msg, InetSocketAddress remoteAddress) { super(msg); this.msg = msg; + this.remoteAddress = remoteAddress; } public String getClientID() { return msg.payload().clientIdentifier(); } + public Optional getRemoteAddress() { + return Optional.ofNullable(remoteAddress); + } + + public String getClientAddress() { + return remoteAddress == null ? null : remoteAddress.getHostString(); + } + + public int getClientPort() { + return remoteAddress == null ? -1 : remoteAddress.getPort(); + } + public boolean isCleanSession() { return msg.variableHeader().isCleanSession(); } diff --git a/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java b/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java index 916f9eefb..9963478ca 100644 --- a/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java +++ b/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java @@ -28,11 +28,16 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import java.net.InetSocketAddress; import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.refEq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -123,6 +128,42 @@ public void testNotifyClientConnected() throws Exception { assertEquals(40, n.get()); } + @Test + public void testNotifyClientConnectedIncludesRemoteAddress() throws Exception { + final CountDownLatch notified = new CountDownLatch(1); + final AtomicReference intercepted = new AtomicReference<>(); + final BrokerInterceptor localInterceptor = new BrokerInterceptor( + Collections.singletonList(new AbstractInterceptHandler() { + @Override + public String getID() { + return "RemoteAddressObserver"; + } + + @Override + public void onConnect(InterceptConnectMessage msg) { + intercepted.set(msg); + notified.countDown(); + } + + @Override + public void onSessionLoopError(Throwable error) { + throw new RuntimeException(error); + } + })); + + try { + final InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 12345); + localInterceptor.notifyClientConnected(MqttMessageBuilders.connect().build(), remoteAddress); + + assertTrue(notified.await(1, TimeUnit.SECONDS)); + assertEquals(remoteAddress, intercepted.get().getRemoteAddress().get()); + assertEquals("127.0.0.1", intercepted.get().getClientAddress()); + assertEquals(12345, intercepted.get().getClientPort()); + } finally { + localInterceptor.stop(); + } + } + @Test public void testNotifyClientDisconnected() throws Exception { interceptor.notifyClientDisconnected("cli1234", "cli1234"); From 1fe4b3db6e11d7b3d525200ac35e522eebb2dfe3 Mon Sep 17 00:00:00 2001 From: HTHou Date: Fri, 12 Jun 2026 17:25:36 +0800 Subject: [PATCH 2/2] Expose channel in connect intercept --- .../io/moquette/broker/MQTTConnection.java | 2 +- .../java/io/moquette/broker/PostOffice.java | 6 ++--- .../interception/BrokerInterceptor.java | 8 +++---- .../io/moquette/interception/Interceptor.java | 5 ++-- .../messages/InterceptConnectMessage.java | 24 ++++++++++++++----- .../interception/BrokerInterceptorTest.java | 8 ++++++- 6 files changed, 35 insertions(+), 18 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index e7cf18135..063e4cdec 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -396,7 +396,7 @@ public void operationComplete(ChannelFuture future) throws Exception { setupInflightResender(channel); } - postOffice.dispatchConnection(msg, remoteAddress()); + postOffice.dispatchConnection(msg, channel); LOG.trace("dispatch connection: {}", msg); } } else { diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index b53cbc7e4..e3e684656 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -25,6 +25,7 @@ import io.moquette.interception.BrokerInterceptor; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageBuilders; @@ -42,7 +43,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.StandardCharsets; @@ -1125,8 +1125,8 @@ private static boolean validateContentTypeAsUTF8(MqttPublishMessage msg) { * notify MqttConnectMessage after connection established (already pass login). * @param msg */ - void dispatchConnection(MqttConnectMessage msg, InetSocketAddress remoteAddress) { - interceptor.notifyClientConnected(msg, remoteAddress); + void dispatchConnection(MqttConnectMessage msg, Channel channel) { + interceptor.notifyClientConnected(msg, channel); } void dispatchDisconnection(String clientId,String userName) { diff --git a/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java b/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java index adf0e205e..1aefff6d3 100644 --- a/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java +++ b/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java @@ -23,12 +23,12 @@ import io.moquette.broker.subscriptions.Subscription; import io.moquette.metrics.MetricsProvider; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -112,15 +112,15 @@ public void stop() { @Override public void notifyClientConnected(final MqttConnectMessage msg) { - notifyClientConnected(msg, null); + notifyClientConnected(msg, (Channel) null); } @Override - public void notifyClientConnected(final MqttConnectMessage msg, final InetSocketAddress remoteAddress) { + public void notifyClientConnected(final MqttConnectMessage msg, final Channel channel) { for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) { LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}", msg.payload().clientIdentifier(), handler.getID()); - executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg, remoteAddress))); + executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg, channel))); } } diff --git a/broker/src/main/java/io/moquette/interception/Interceptor.java b/broker/src/main/java/io/moquette/interception/Interceptor.java index 6e49c90f9..12f07e412 100644 --- a/broker/src/main/java/io/moquette/interception/Interceptor.java +++ b/broker/src/main/java/io/moquette/interception/Interceptor.java @@ -19,11 +19,10 @@ import io.moquette.interception.messages.InterceptAcknowledgedMessage; import io.moquette.broker.subscriptions.Subscription; import io.moquette.interception.messages.InterceptExceptionMessage; +import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; -import java.net.InetSocketAddress; - /** * This interface is to be used internally by the broker components. *

@@ -38,7 +37,7 @@ public interface Interceptor { void notifyClientConnected(MqttConnectMessage msg); - default void notifyClientConnected(MqttConnectMessage msg, InetSocketAddress remoteAddress) { + default void notifyClientConnected(MqttConnectMessage msg, Channel channel) { notifyClientConnected(msg); } diff --git a/broker/src/main/java/io/moquette/interception/messages/InterceptConnectMessage.java b/broker/src/main/java/io/moquette/interception/messages/InterceptConnectMessage.java index 699aec61a..12752c120 100644 --- a/broker/src/main/java/io/moquette/interception/messages/InterceptConnectMessage.java +++ b/broker/src/main/java/io/moquette/interception/messages/InterceptConnectMessage.java @@ -16,6 +16,7 @@ package io.moquette.interception.messages; +import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.MqttConnectMessage; import java.net.InetSocketAddress; @@ -24,32 +25,43 @@ public class InterceptConnectMessage extends InterceptAbstractMessage { private final MqttConnectMessage msg; - private final InetSocketAddress remoteAddress; + private final Channel channel; public InterceptConnectMessage(MqttConnectMessage msg) { this(msg, null); } - public InterceptConnectMessage(MqttConnectMessage msg, InetSocketAddress remoteAddress) { + public InterceptConnectMessage(MqttConnectMessage msg, Channel channel) { super(msg); this.msg = msg; - this.remoteAddress = remoteAddress; + this.channel = channel; } public String getClientID() { return msg.payload().clientIdentifier(); } + public Optional getChannel() { + return Optional.ofNullable(channel); + } + public Optional getRemoteAddress() { - return Optional.ofNullable(remoteAddress); + if (channel != null && channel.remoteAddress() instanceof InetSocketAddress) { + return Optional.of((InetSocketAddress) channel.remoteAddress()); + } + return Optional.empty(); } public String getClientAddress() { - return remoteAddress == null ? null : remoteAddress.getHostString(); + return getRemoteAddress() + .map(InetSocketAddress::getHostString) + .orElse(null); } public int getClientPort() { - return remoteAddress == null ? -1 : remoteAddress.getPort(); + return getRemoteAddress() + .map(InetSocketAddress::getPort) + .orElse(-1); } public boolean isCleanSession() { diff --git a/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java b/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java index 9963478ca..9b391e5c5 100644 --- a/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java +++ b/broker/src/test/java/io/moquette/interception/BrokerInterceptorTest.java @@ -21,6 +21,7 @@ import io.moquette.broker.subscriptions.Topic; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubscriptionOption; @@ -37,10 +38,12 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.refEq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class BrokerInterceptorTest { @@ -153,9 +156,12 @@ public void onSessionLoopError(Throwable error) { try { final InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 12345); - localInterceptor.notifyClientConnected(MqttMessageBuilders.connect().build(), remoteAddress); + final Channel channel = mock(Channel.class); + when(channel.remoteAddress()).thenReturn(remoteAddress); + localInterceptor.notifyClientConnected(MqttMessageBuilders.connect().build(), channel); assertTrue(notified.await(1, TimeUnit.SECONDS)); + assertSame(channel, intercepted.get().getChannel().get()); assertEquals(remoteAddress, intercepted.get().getRemoteAddress().get()); assertEquals("127.0.0.1", intercepted.get().getClientAddress()); assertEquals(12345, intercepted.get().getClientPort());