Spring Redis Stream 消费者停止消费消息(地址已在使用中)

我尝试使用 spring-data-redis 测试 redis 流。我已经实现了两个应用程序 - 第一个将记录添加到流中,第二个使用来自流的消息。

它可以工作,但一段时间后(通常在处理了 80000 多条消息之后),“消费者应用程序”抛出异常:“org.springframework.data.redis.RedisConnectionFailureException:无法连接到 Redis;嵌套异常是 java. util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: 无法连接到 localhost:6379"

我已经在 Ubuntu(win10 linux 子系统)上安装了 Redis 6.2.1,并使用默认配置(端口为 6379)运行它。当我在 Docker 中运行 Redis 时也会发生同样的情况。

我已经测试了 Redis 的性能,所以消息被添加到一个循环中,没有延迟。我将消息添加到流中:

ObjectRecord<String, String> record = StreamRecords.newRecord()
                .ofObject(message)
                .withStreamKey("my-stream");

redisTemplate.opsForStream()
                .add(record);

我将消息消费为:

StreamReceiver.StreamReceiverOptions<String, MapRecord<String, String, String>> options =
                StreamReceiver.StreamReceiverOptions.builder()
                        .pollTimeout(Duration.ofSeconds(2))
                        .build();

StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);
        Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-tream"));

messages.subscribe(new StreamSubscriber());

其中 StreamSubscriber 只是我的 org.reactivestreams.Subscriber 实现。我也尝试了 StreamMessageListenerContainer 方法,但结果是一样的。

org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.translateException(LettuceConnectionFactory.java:1553)
    at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.lambda$getConnectionAsync$0(LettuceConnectionFactory.java:1491)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at io.lettuce.core.DefaultConnectionFuture.lambda$null$0(DefaultConnectionFuture.java:257)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:143)
    at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:254)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at io.lettuce.core.AbstractRedisClient.lambda$initializeChannelAsync0$4(AbstractRedisClient.java:405)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
    at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
    at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
    ... 30 more
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:78)
    at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
    at io.lettuce.core.RedisClient.lambda$transformAsyncConnectionException$20(RedisClient.java:767)
    at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:253)
    ... 22 more
Caused by: java.util.concurrent.CompletionException: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
    ... 20 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

RedisTemplateLettuceConnectionFactory 由 springboot 创建和自动配置(我也尝试了一些自定义配置,但没有成功)

我还观察到的是,redis 连接在消息发送到流后关闭,并且在要发送新消息时创建新连接。

2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] sk.kedros.learn.camel.redis.Publisher    : sending message: {"obuSn":"eeee00a5-616e-46ad-80e4-50780fab1336","lon":0.0,"lat":0.0,"data":"99761 - provider1"}
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] o.s.d.redis.core.RedisConnectionUtils    : Fetching Redis Connection from RedisConnectionFactory
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] io.lettuce.core.RedisChannelHandler      : dispatching command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, epid=0x1] write() writeAndFlush command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, epid=0x1] write() done
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandEncoder  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Received: 22 bytes, 1 commands in the stack
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] i.l.core.protocol.RedisStateMachine      : Decode done, empty stack: true
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Completing command AsyncCommand [type=XADD, output=StatusOutput [output=1618125131455-2, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] o.s.d.redis.core.RedisConnectionUtils    : Closing Redis Connection.

这也会影响“消费者应用程序”的 Redis 连接——redis 连接会定期打开和关闭。

2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] onStreamMessage(MapBackedRecord{recordId=1618125112425-0, kvMap={payload={"obuSn":"d066bdd7-21b5-46f0-81ca-09afe4fd6596","lon":0.0,"lat":1.0,"data":"68541 - provider1"}}}): Emitting item, slow-path
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.RedisStateMachine      : Decode done, empty stack: true
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] Completing command SubscriptionCommand [type=XREAD, output=StreamReadOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisChannelHandler      : closeAsync()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, epid=0x3bdf] closeAsync()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] onComplete()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] scheduleIfRequired()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] scheduleIfRequired(): Activating subscription
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] scheduleIfRequired(): Activating subscription, offset ReadOffset(offset=1618125112425-0)
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisClient              : Trying to get a Redis connection for: redis://localhost
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisClient              : Resolved SocketAddress localhost:6379 using redis://localhost
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.AbstractRedisClient      : Connecting to Redis at localhost:6379
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelInactive()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-8] io.lettuce.core.protocol.CommandHandler  : [channel=0x87e269ec, [id: 0x6e14c57c] (inactive), chid=0x3be0] channelRegistered()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, epid=0x3bdf] deactivating endpoint handler
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelInactive() done
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.ConnectionWatchdog     : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, last known addr=localhost/127.0.0.1:6379] channelInactive()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.ConnectionWatchdog     : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, last known addr=localhost/127.0.0.1:6379] Reconnect scheduling disabled
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelUnregistered()
2021-04-11 09:11:52.432 DEBUG 21636 --- [ioEventLoop-4-8] io.lettuce.core.AbstractRedisClient      : Connecting to Redis at localhost:6379: localhost:6379

io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]

正如我所提到的,它可以正常工作一段时间,但最后以地址已在使用中:没有更多信息结束

我错过了什么吗?是 Redis 安装/配置问题吗?还是生菜客户的问题?或者?

stack overflow Spring Redis Stream consumer stops consuming messages (Address already in use)
原文答案

答案:

作者头像

可能您的 Redis 服务器由于内存不足而崩溃。为您的服务器分配更多内存,或者您可以对 Redis 流执行流 maxlen 操作,以便 Redis 服务器具有内存大小的一致性。