我需要为另一个应用程序(客户端)公开一个TCP服务器接口,这是我的大学用于处理某些研究的程序。我使用Netty来完成任务。而且在大多数情况下都可以正常工作。 但是,当客户端发送大块字符串(例如一百万个或更多字符)时,就会出现问题。 在通道处理程序(下面的代码)中, 服务器接收的字符串很重要,它不包含换行符。 我的Netty服务器,这很简单: 我的管道很简单: 处理程序: 客户端未使用Netty,因为它是可以正常工作的旧代码。但是,即使比更改服务器更难,我当然也可以更改客户端。 为这个服务器编写的客户端相当简单, 虽然显然非常简单,但是该客户端显然可以与以前的服务器实现一起很好地工作,不幸的是,该客户端丢失了并且无法恢复。 那么问题可能出在哪里?当客户发送一个很长的字符串(一百万个或更多字符)时,为什么Netty有时会过早调用channelRead0
接收数据(每块65,536个字符)。但是有时,并不是每次都在整个文本被接收之前调用channelReadComplete
,这会导致整个消息被无意地拆分为2。但是服务器和客户端上似乎都没有异常。public class TCPServer{
EventLoopGroup connectionGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
void start(int port) throws InterruptedException {
try {
ServerBootstrap b = new ServerBootstrap();
b.group(connectionGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TCPChannelInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
if(f.isSuccess()){
// ...
}
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
connectionGroup.shutdownGracefully();
}
}
void stop(){
workerGroup.shutdownGracefully();
connectionGroup.shutdownGracefully();
}
}
public class TCPChannelInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new SimpleHandler());
}
}
public class SimpleHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
String received = (String) msg;
// Save chunk of string somewhere
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// Whole message is supposedly here
// Call a blocking logic function here that processes the received string
ctx.write("{\"result\": \"some result sent back to the server\"}");
ctx.flush();
super.channelReadComplete(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
try (var socket = new Socket(host, port)) {
var in = new Scanner(socket.getInputStream());
var out = socket.getOutputStream();
try {
Thread.sleep(withDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
// line is just a string of an absolutely random length (without line breaks)
System.out.println(String.format("Sending text of length: %d", line.length()));
byte[] bArr = line.getBytes();
try {
out.write(bArr, 0, bArr.length);
out.flush();
System.out.println(in.nextLine());
} catch (IOException e) {
e.printStackTrace();
}
}
channelReadComplete
?
是否存在一些大小限制,超时或其他我可能会缺少的东西? 0 个答案:
没有答案