public class ChatServer {
private int port;
private Map<String, Channel channels = new ConcurrentHashMap<();
private DefaultEventExecutorGroup executorGroup;
public ChatServer(int port) {
this.port = port;
// 创建线程池用于处理广播操作,避免阻塞Netty的I/O线程
executorGroup = new DefaultEventExecutorGroup(16);
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
System.out.println(Chat server is starting at port + port);
ChannelFuture f = b.bind(port).sync();
System.out.println(Chat server started successfully);
public void addChannel(String id, Channel channel) {
channels.put(id, channel);
}
public void removeChannel(String id) {
channels.remove(id);
}
public void broadcastMessage(String message) {
executorGroup.execute(() - {
for (Channel channel : channels.values()) {
channel.writeAndFlush(message + \n);
}
});
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length 0) {
port = Integer.parseInt(args[0]);
}
new ChatServer(port).run();
}
}
public ChatServerHandler(ChatServer server) {
this.server = server;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 生成一个唯一的ID,使用短ID
id = ctx.channel().id().asShortText();
server.addChannel(id, ctx.channel());
System.out.println(Client connected: + id);
server.broadcastMessage(User + id + has joined the chat);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
server.removeChannel(id);
System.out.println(Client disconnected: + id);
server.broadcastMessage(User + id + has left the chat);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(Received message from + id + : + message);
// 如果是quit命令,断开连接
if (quit.equalsIgnoreCase(message)) {
ctx.close();
return;
}
// 广播消息,不包括发送者自己
server.broadcastMessage([ + id + ] + message);
}
public class ChatClient {
private String host;
private int port;
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ChatClientHandler());
}
});
System.out.println(Connecting to server at + host + : + port);
ChannelFuture f = b.connect(host, port).sync();
System.out.println(Connected to server);
// 读取用户输入
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String line = reader.readLine();
if (line == null) {
break;
}
f.channel().writeAndFlush(line);
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
String host = localhost;
int port = 8080;
if (args.length 0) {
port = Integer.parseInt(args[0]);
}
if (args.length 1) {
host = args[1];
}
new ChatClient(host, port).run();
}
}