Skip to content

Commit 18c8f23

Browse files
authored
Merge pull request #771 from GaryLeung922/hotfix
Hotfix:multy node with redisson receive multy same message
2 parents 72cfd61 + 452870d commit 18c8f23

5 files changed

Lines changed: 276 additions & 116 deletions

File tree

src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

Lines changed: 9 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -15,123 +15,24 @@
1515
*/
1616
package com.corundumstudio.socketio;
1717

18-
import java.util.Arrays;
19-
import java.util.Collection;
20-
import java.util.HashMap;
21-
import java.util.HashSet;
22-
import java.util.Map;
23-
import java.util.Map.Entry;
24-
import java.util.Set;
25-
26-
import com.corundumstudio.socketio.misc.IterableCollection;
27-
import com.corundumstudio.socketio.namespace.Namespace;
2818
import com.corundumstudio.socketio.protocol.Packet;
29-
import com.corundumstudio.socketio.protocol.PacketType;
30-
import com.corundumstudio.socketio.store.StoreFactory;
31-
import com.corundumstudio.socketio.store.pubsub.DispatchMessage;
32-
import com.corundumstudio.socketio.store.pubsub.PubSubType;
19+
20+
import java.util.Collection;
3321

3422
/**
35-
* Fully thread-safe.
23+
* broadcast interface
3624
*
3725
*/
38-
public class BroadcastOperations implements ClientOperations {
39-
40-
private final Iterable<SocketIOClient> clients;
41-
private final StoreFactory storeFactory;
42-
43-
public BroadcastOperations(Iterable<SocketIOClient> clients, StoreFactory storeFactory) {
44-
super();
45-
this.clients = clients;
46-
this.storeFactory = storeFactory;
47-
}
48-
49-
private void dispatch(Packet packet) {
50-
Map<String, Set<String>> namespaceRooms = new HashMap<String, Set<String>>();
51-
for (SocketIOClient socketIOClient : clients) {
52-
Namespace namespace = (Namespace)socketIOClient.getNamespace();
53-
Set<String> rooms = namespace.getRooms(socketIOClient);
54-
55-
Set<String> roomsList = namespaceRooms.get(namespace.getName());
56-
if (roomsList == null) {
57-
roomsList = new HashSet<String>();
58-
namespaceRooms.put(namespace.getName(), roomsList);
59-
}
60-
roomsList.addAll(rooms);
61-
}
62-
for (Entry<String, Set<String>> entry : namespaceRooms.entrySet()) {
63-
for (String room : entry.getValue()) {
64-
storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey()));
65-
}
66-
}
67-
}
68-
69-
public Collection<SocketIOClient> getClients() {
70-
return new IterableCollection<SocketIOClient>(clients);
71-
}
72-
73-
@Override
74-
public void send(Packet packet) {
75-
for (SocketIOClient client : clients) {
76-
client.send(packet);
77-
}
78-
dispatch(packet);
79-
}
80-
81-
public <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback) {
82-
for (SocketIOClient client : clients) {
83-
client.send(packet, ackCallback.createClientCallback(client));
84-
}
85-
ackCallback.loopFinished();
86-
}
26+
public interface BroadcastOperations extends ClientOperations {
8727

88-
@Override
89-
public void disconnect() {
90-
for (SocketIOClient client : clients) {
91-
client.disconnect();
92-
}
93-
}
28+
Collection<SocketIOClient> getClients();
9429

95-
public void sendEvent(String name, SocketIOClient excludedClient, Object... data) {
96-
Packet packet = new Packet(PacketType.MESSAGE);
97-
packet.setSubType(PacketType.EVENT);
98-
packet.setName(name);
99-
packet.setData(Arrays.asList(data));
30+
<T> void send(Packet packet, BroadcastAckCallback<T> ackCallback);
10031

101-
for (SocketIOClient client : clients) {
102-
if (client.getSessionId().equals(excludedClient.getSessionId())) {
103-
continue;
104-
}
105-
client.send(packet);
106-
}
107-
dispatch(packet);
108-
}
109-
110-
@Override
111-
public void sendEvent(String name, Object... data) {
112-
Packet packet = new Packet(PacketType.MESSAGE);
113-
packet.setSubType(PacketType.EVENT);
114-
packet.setName(name);
115-
packet.setData(Arrays.asList(data));
116-
send(packet);
117-
}
32+
void sendEvent(String name, SocketIOClient excludedClient, Object... data);
11833

119-
public <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback) {
120-
for (SocketIOClient client : clients) {
121-
client.sendEvent(name, ackCallback.createClientCallback(client), data);
122-
}
123-
ackCallback.loopFinished();
124-
}
125-
126-
public <T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback) {
127-
for (SocketIOClient client : clients) {
128-
if (client.getSessionId().equals(excludedClient.getSessionId())) {
129-
continue;
130-
}
131-
client.sendEvent(name, ackCallback.createClientCallback(client), data);
132-
}
133-
ackCallback.loopFinished();
134-
}
34+
<T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback);
13535

36+
<T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback);
13637

13738
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright (c) 2012-2019 Nikita Koksharov
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.corundumstudio.socketio;
17+
18+
import com.corundumstudio.socketio.protocol.Packet;
19+
20+
import java.util.Collection;
21+
import java.util.HashSet;
22+
import java.util.Set;
23+
24+
/**
25+
* @Author: liangjiaqi
26+
* @Date: 2020/8/8 6:02 PM
27+
*/
28+
public class MultiRoomBroadcastOperations implements BroadcastOperations {
29+
30+
private Collection<BroadcastOperations> broadcastOperations;
31+
32+
public MultiRoomBroadcastOperations(Collection<BroadcastOperations> broadcastOperations) {
33+
this.broadcastOperations = broadcastOperations;
34+
}
35+
36+
@Override
37+
public Collection<SocketIOClient> getClients() {
38+
Set<SocketIOClient> clients = new HashSet<SocketIOClient>();
39+
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
40+
return clients;
41+
}
42+
for( BroadcastOperations b : this.broadcastOperations ) {
43+
clients.addAll( b.getClients() );
44+
}
45+
return clients;
46+
}
47+
48+
@Override
49+
public <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback) {
50+
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
51+
return;
52+
}
53+
for( BroadcastOperations b : this.broadcastOperations ) {
54+
b.send( packet, ackCallback );
55+
}
56+
}
57+
58+
@Override
59+
public void sendEvent(String name, SocketIOClient excludedClient, Object... data) {
60+
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
61+
return;
62+
}
63+
for( BroadcastOperations b : this.broadcastOperations ) {
64+
b.sendEvent( name, excludedClient, data );
65+
}
66+
}
67+
68+
@Override
69+
public <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback) {
70+
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
71+
return;
72+
}
73+
for( BroadcastOperations b : this.broadcastOperations ) {
74+
b.sendEvent( name, data, ackCallback );
75+
}
76+
}
77+
78+
@Override
79+
public <T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback) {
80+
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
81+
return;
82+
}
83+
for( BroadcastOperations b : this.broadcastOperations ) {
84+
b.sendEvent( name, data, excludedClient, ackCallback );
85+
}
86+
}
87+
88+
@Override
89+
public void send(Packet packet) {
90+
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
91+
return;
92+
}
93+
for( BroadcastOperations b : this.broadcastOperations ) {
94+
b.send( packet );
95+
}
96+
}
97+
98+
@Override
99+
public void disconnect() {
100+
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
101+
return;
102+
}
103+
for( BroadcastOperations b : this.broadcastOperations ) {
104+
b.disconnect();
105+
}
106+
}
107+
108+
@Override
109+
public void sendEvent(String name, Object... data) {
110+
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) {
111+
return;
112+
}
113+
for( BroadcastOperations b : this.broadcastOperations ) {
114+
b.sendEvent( name, data );
115+
}
116+
}
117+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/**
2+
* Copyright (c) 2012-2019 Nikita Koksharov
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.corundumstudio.socketio;
17+
18+
import com.corundumstudio.socketio.misc.IterableCollection;
19+
import com.corundumstudio.socketio.protocol.Packet;
20+
import com.corundumstudio.socketio.protocol.PacketType;
21+
import com.corundumstudio.socketio.store.StoreFactory;
22+
import com.corundumstudio.socketio.store.pubsub.DispatchMessage;
23+
import com.corundumstudio.socketio.store.pubsub.PubSubType;
24+
25+
import java.util.Arrays;
26+
import java.util.Collection;
27+
28+
/**
29+
* @Author: liangjiaqi
30+
* @Date: 2020/8/8 6:08 PM
31+
*/
32+
public class SingleRoomBroadcastOperations implements BroadcastOperations {
33+
private final String namespace;
34+
private final String room;
35+
private final Iterable<SocketIOClient> clients;
36+
private final StoreFactory storeFactory;
37+
38+
public SingleRoomBroadcastOperations(String namespace, String room, Iterable<SocketIOClient> clients, StoreFactory storeFactory) {
39+
super();
40+
this.namespace = namespace;
41+
this.room = room;
42+
this.clients = clients;
43+
this.storeFactory = storeFactory;
44+
}
45+
46+
private void dispatch(Packet packet) {
47+
this.storeFactory.pubSubStore().publish(
48+
PubSubType.DISPATCH,
49+
new DispatchMessage(this.room, packet, this.namespace));
50+
}
51+
52+
@Override
53+
public Collection<SocketIOClient> getClients() {
54+
return new IterableCollection<SocketIOClient>(clients);
55+
}
56+
57+
@Override
58+
public void send(Packet packet) {
59+
for (SocketIOClient client : clients) {
60+
client.send(packet);
61+
}
62+
dispatch(packet);
63+
}
64+
65+
@Override
66+
public <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback) {
67+
for (SocketIOClient client : clients) {
68+
client.send(packet, ackCallback.createClientCallback(client));
69+
}
70+
ackCallback.loopFinished();
71+
}
72+
73+
@Override
74+
public void disconnect() {
75+
for (SocketIOClient client : clients) {
76+
client.disconnect();
77+
}
78+
}
79+
80+
@Override
81+
public void sendEvent(String name, SocketIOClient excludedClient, Object... data) {
82+
Packet packet = new Packet(PacketType.MESSAGE);
83+
packet.setSubType(PacketType.EVENT);
84+
packet.setName(name);
85+
packet.setData(Arrays.asList(data));
86+
87+
for (SocketIOClient client : clients) {
88+
if (client.getSessionId().equals(excludedClient.getSessionId())) {
89+
continue;
90+
}
91+
client.send(packet);
92+
}
93+
dispatch(packet);
94+
}
95+
96+
@Override
97+
public void sendEvent(String name, Object... data) {
98+
Packet packet = new Packet(PacketType.MESSAGE);
99+
packet.setSubType(PacketType.EVENT);
100+
packet.setName(name);
101+
packet.setData(Arrays.asList(data));
102+
send(packet);
103+
}
104+
105+
@Override
106+
public <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback) {
107+
for (SocketIOClient client : clients) {
108+
client.sendEvent(name, ackCallback.createClientCallback(client), data);
109+
}
110+
ackCallback.loopFinished();
111+
}
112+
113+
@Override
114+
public <T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback) {
115+
for (SocketIOClient client : clients) {
116+
if (client.getSessionId().equals(excludedClient.getSessionId())) {
117+
continue;
118+
}
119+
client.sendEvent(name, ackCallback.createClientCallback(client), data);
120+
}
121+
ackCallback.loopFinished();
122+
}
123+
}

0 commit comments

Comments
 (0)