Skip to content

Commit e079eb2

Browse files
authored
cluster (#19)
1 parent 417e984 commit e079eb2

File tree

6 files changed

+291
-99
lines changed

6 files changed

+291
-99
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ plugins {
88
id 'signing'
99
}
1010

11-
def jarVersion = "1.0.10"
11+
def jarVersion = "1.0.11"
1212
group = 'io.nats'
1313

1414
def isMerge = System.getenv("BUILD_EVENT") == "push"
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2022 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package nats.io;
14+
15+
import java.util.List;
16+
17+
class ClusterInsert {
18+
public int id;
19+
public int port;
20+
public int listen;
21+
public String[] configInserts;
22+
23+
public void setInsert(List<String> configInsertsList) {
24+
int lines = configInsertsList.size();
25+
this.configInserts = new String[lines];
26+
for (int x = 0; x < lines; x++) {
27+
configInserts[x] = configInsertsList.get(x);
28+
}
29+
}
30+
31+
@Override
32+
public String toString() {
33+
return "ClusterInsert" +
34+
"\n id=" + id +
35+
"\n port=" + port +
36+
"\n listen=" + listen +
37+
"\n configInserts:" + insertsString();
38+
}
39+
40+
private String insertsString() {
41+
StringBuilder sb = new StringBuilder();
42+
for (String s : configInserts) {
43+
sb.append("\n ");
44+
sb.append(s);
45+
}
46+
return sb.toString();
47+
}
48+
}

src/main/java/nats/io/NatsRunnerUtils.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.io.InputStreamReader;
1818
import java.net.ServerSocket;
1919
import java.util.ArrayList;
20+
import java.util.List;
2021

2122
public class NatsRunnerUtils {
2223
public static final String NATS_SERVER_PATH_ENV = "nats_server_path";
@@ -116,4 +117,64 @@ public static String getNatsServerVersionString() {
116117
return null;
117118
}
118119
}
120+
121+
public static List<ClusterInsert> createClusterInserts() throws IOException {
122+
return createClusterInserts(3, "clstr", "srvr");
123+
}
124+
125+
public static List<ClusterInsert> createClusterInserts(int count) throws IOException {
126+
return createClusterInserts(count, "clstr", "srvr");
127+
}
128+
129+
public static List<ClusterInsert> createClusterInserts(int count, String clusterName, String serverNamePrefix) throws IOException {
130+
List<ClusterInsert> clusterInserts = new ArrayList<>();
131+
for (int x = 0; x < count; x++) {
132+
ClusterInsert ci = new ClusterInsert();
133+
ci.id = x + 1;
134+
ci.port = nextPort();
135+
ci.listen = nextPort();
136+
clusterInserts.add(ci);
137+
}
138+
return finishCreateClusterInserts(clusterName, serverNamePrefix, clusterInserts);
139+
}
140+
141+
public static List<ClusterInsert> createClusterInserts(int[] ports, int[] listens, String clusterName, String serverNamePrefix) throws IOException {
142+
List<ClusterInsert> clusterInserts = new ArrayList<>();
143+
for (int x = 0; x < ports.length; x++) {
144+
ClusterInsert ci = new ClusterInsert();
145+
ci.id = x + 1;
146+
ci.port = ports[x];
147+
ci.listen = listens[x];
148+
clusterInserts.add(ci);
149+
}
150+
return finishCreateClusterInserts(clusterName, serverNamePrefix, clusterInserts);
151+
}
152+
153+
private static List<ClusterInsert> finishCreateClusterInserts(String clusterName, String serverNamePrefix, List<ClusterInsert> clusterInserts) {
154+
for (ClusterInsert ci : clusterInserts) {
155+
List<String> lines = new ArrayList<>();
156+
lines.add("server_name=" + serverNamePrefix + ci.id);
157+
lines.add("cluster {");
158+
lines.add(" name: " + clusterName);
159+
lines.add(" listen: 127.0.0.1:" + ci.listen);
160+
lines.add(" routes: [");
161+
for (ClusterInsert ciRoutes : clusterInserts) {
162+
if (ciRoutes.id != ci.id) {
163+
lines.add(" nats-route://127.0.0.1:" + ciRoutes.listen);
164+
}
165+
}
166+
lines.add(" ]");
167+
lines.add("}");
168+
ci.setInsert(lines);
169+
}
170+
return clusterInserts;
171+
}
172+
173+
public static void main(String[] args) throws IOException {
174+
List<ClusterInsert> clusterInserts = createClusterInserts(4);
175+
System.out.println(clusterInserts.get(0));
176+
System.out.println(clusterInserts.get(1));
177+
System.out.println(clusterInserts.get(2));
178+
System.out.println(clusterInserts.get(3));
179+
}
119180
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2022 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package nats.io;
14+
15+
import org.junit.jupiter.api.Test;
16+
17+
import java.io.IOException;
18+
import java.util.List;
19+
20+
import static nats.io.NatsRunnerUtils.createClusterInserts;
21+
22+
class NatsClusterTest extends TestBase {
23+
24+
@Test
25+
public void testCreateCluster() throws IOException, InterruptedException {
26+
List<ClusterInsert> clusterInserts = createClusterInserts();
27+
ClusterInsert ci1 = clusterInserts.get(0);
28+
ClusterInsert ci2 = clusterInserts.get(1);
29+
ClusterInsert ci3 = clusterInserts.get(2);
30+
31+
try (NatsServerRunner runner1 = new NatsServerRunner(ci1.port, false, false, null, ci1.configInserts, null)) {
32+
try (NatsServerRunner runner2 = new NatsServerRunner(ci2.port, false, false, null, ci2.configInserts, null)) {
33+
try (NatsServerRunner runner3 = new NatsServerRunner(ci3.port, false, false, null, ci3.configInserts, null)) {
34+
35+
Thread.sleep(5000); // give servers time to spin up and be ready
36+
37+
validateCommandLine(runner1, false, false);
38+
validateHostAndPort(runner1);
39+
validateConfigLines(runner1);
40+
connect(runner1);
41+
42+
validateCommandLine(runner2, false, false);
43+
validateHostAndPort(runner2);
44+
validateConfigLines(runner2);
45+
connect(runner2);
46+
47+
validateCommandLine(runner3, false, false);
48+
validateHostAndPort(runner3);
49+
validateConfigLines(runner3);
50+
connect(runner3);
51+
}
52+
}
53+
}
54+
}
55+
}
Lines changed: 13 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,27 @@
1-
/*
2-
* This Java source file was generated by the Gradle 'init' task.
3-
*/
1+
// Copyright 2022 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
413
package nats.io;
514

615
import org.junit.jupiter.api.Test;
716
import org.junit.jupiter.params.ParameterizedTest;
817
import org.junit.jupiter.params.provider.Arguments;
918
import org.junit.jupiter.params.provider.MethodSource;
1019

11-
import java.io.File;
1220
import java.io.IOException;
13-
import java.io.InputStream;
14-
import java.net.InetSocketAddress;
15-
import java.net.Socket;
16-
import java.net.SocketAddress;
17-
import java.nio.file.Files;
1821
import java.util.Arrays;
19-
import java.util.Collections;
20-
import java.util.List;
2122
import java.util.stream.Stream;
2223

23-
import static java.util.stream.Collectors.toList;
24-
import static nats.io.NatsRunnerUtils.DEBUG_OPTION;
25-
import static nats.io.NatsRunnerUtils.JETSTREAM_OPTION;
26-
import static org.junit.jupiter.api.Assertions.assertEquals;
27-
import static org.junit.jupiter.api.Assertions.assertTrue;
28-
import static org.junit.platform.commons.util.CollectionUtils.toUnmodifiableList;
29-
30-
class NatsServerRunnerTest {
31-
32-
public static final String SOURCE_CONFIG_FILE_PATH = "src/test/resources/";
24+
class NatsServerRunnerTest extends TestBase {
3325

3426
@Test
3527
public void testWithoutConfigDefault() throws IOException, InterruptedException {
@@ -95,81 +87,4 @@ public void testWithConfig(String configFile, boolean checkConnect) throws IOExc
9587
}
9688
}
9789
}
98-
99-
private void validateCommandLine(NatsServerRunner runner, boolean debug, boolean jetStream, String... customArgs) {
100-
assertEquals(debug, runner.getCmdLine().contains(" " + DEBUG_OPTION));
101-
assertEquals(jetStream, runner.getCmdLine().contains(" " + JETSTREAM_OPTION));
102-
for (String ca : customArgs) {
103-
assertTrue(runner.getCmdLine().contains(" " + ca));
104-
}
105-
}
106-
107-
private void validateHostAndPort(NatsServerRunner server) {
108-
assertTrue(server.getPort() > 0);
109-
assertTrue(server.getPort() != 1234);
110-
assertTrue(server.getURI().startsWith("nats://localhost"));
111-
}
112-
113-
private void validateConfigLines(NatsServerRunner runner) throws IOException {
114-
validateConfigLines(runner, null);
115-
}
116-
117-
private void validateConfigLines(NatsServerRunner runner, String configFile, String[] configInserts) throws IOException {
118-
List<String> expected = Files.lines(new File(SOURCE_CONFIG_FILE_PATH + configFile).toPath())
119-
.map(String::trim)
120-
.filter(s -> s.length() > 0)
121-
.filter(s -> !s.contains("port"))
122-
.collect(toList());
123-
Collections.addAll(expected, configInserts);
124-
validateConfigLines(runner, expected);
125-
}
126-
127-
private void validateConfigLines(NatsServerRunner runner, List<String> expected) throws IOException {
128-
List<String> lines = Files.lines(new File(runner.getConfigFile()).toPath())
129-
.map(String::trim)
130-
.filter(s -> s.length() > 0)
131-
.collect(toUnmodifiableList());
132-
133-
assertTrue(lines.contains("port: " + runner.getPort()));
134-
if (expected != null) {
135-
for (String ex : expected) {
136-
assertTrue(lines.contains(ex));
137-
}
138-
}
139-
}
140-
141-
private static final byte[] CONNECT_BYTES = "CONNECT {\"lang\":\"java\",\"version\":\"2.11.5\",\"protocol\":1,\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n".getBytes();
142-
private void connect(NatsServerRunner runner) throws IOException {
143-
Socket socket = new Socket();
144-
SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", runner.getPort());
145-
socket.connect(socketAddress);
146-
assertEquals(runner.getPort(), socket.getPort());
147-
148-
socket.getOutputStream().write(CONNECT_BYTES);
149-
socket.getOutputStream().flush();
150-
151-
InputStream in = socket.getInputStream();
152-
// give the server time to respond or this flaps
153-
try {
154-
Thread.sleep(100);
155-
} catch (InterruptedException e) {
156-
// ignore
157-
}
158-
159-
StringBuilder sb = new StringBuilder();
160-
int cr = 0;
161-
int i = in.read();
162-
while (i != -1) {
163-
sb.append((char)i);
164-
if (i == 13) {
165-
cr++;
166-
}
167-
i = (cr > 1) ? -1 : in.read();
168-
}
169-
in.close();
170-
171-
String sbs = sb.toString().trim();
172-
assertTrue(sbs.startsWith("INFO"));
173-
assertTrue(sbs.contains("\"port\":" + runner.getPort()));
174-
}
17590
}

0 commit comments

Comments
 (0)