From 323e03010a04f4527f4098eeafa5844751b4515a Mon Sep 17 00:00:00 2001 From: vivekkoya <13vivekkoya@gmail.com> Date: Wed, 22 Oct 2025 21:28:28 -0700 Subject: [PATCH 1/5] Add necessary imports and fix mistakes in StreamingTest --- .../cassandra/net/OutboundConnection.java | 54 +++-- .../net/OutboundConnectionInitiator.java | 19 +- .../apache/cassandra/net/StreamingTest.java | 208 ++++++++++++++++++ 3 files changed, 249 insertions(+), 32 deletions(-) create mode 100644 test/unit/org/apache/cassandra/net/StreamingTest.java diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 4aa754d8aa31..3113f462e0b4 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -1127,42 +1127,38 @@ void onCompletedHandshake(Result result) // it is expected that close, if successful, has already cancelled us; so we do not need to worry about leaking connections assert !state.isClosed(); - MessagingSuccess success = result.success(); - debug.onConnect(success.messagingVersion, settings); - state.disconnected().maintenance.cancel(false); - - FrameEncoder.PayloadAllocator payloadAllocator = success.allocator; - Channel channel = success.channel; - Established established = new Established(success.messagingVersion, channel, payloadAllocator, settings); - state = established; - channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() { - @Override - public void channelInactive(ChannelHandlerContext ctx) - { - disconnectNow(established); - ctx.fireChannelInactive(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - { - try - { - invalidateChannel(established, cause); + if (result.success() instanceof MessagingSuccess success) { + debug.onConnect(success.messagingVersion, settings); + state.disconnected().maintenance.cancel(false); + + FrameEncoder.PayloadAllocator payloadAllocator = success.allocator; + Channel channel = success.channel; + Established established = new Established(success.messagingVersion, channel, payloadAllocator, settings); + state = established; + channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() { + @Override + public void channelInactive(ChannelHandlerContext ctx) { + disconnectNow(established); + ctx.fireChannelInactive(); } - catch (Throwable t) - { - logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + try { + invalidateChannel(established, cause); + } catch (Throwable t) { + logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); + } } - } - }); - ++successfulConnections; + }); + ++successfulConnections; - logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}", + logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}", id(true), success.messagingVersion, settings.framing, encryptionConnectionSummary(channel)); + } break; case RETRY: diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java index 2bddc174f2e8..3c6114faec55 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java @@ -525,14 +525,27 @@ private Result(Outcome outcome) } boolean isSuccess() { return outcome == Outcome.SUCCESS; } - public SuccessType success() { return (SuccessType) this; } + public Success success() { + if (this instanceof Success success) + return success; + return null; + } static MessagingSuccess messagingSuccess(Channel channel, int messagingVersion, FrameEncoder.PayloadAllocator allocator) { return new MessagingSuccess(channel, messagingVersion, allocator); } static StreamingSuccess streamingSuccess(Channel channel, int messagingVersion) { return new StreamingSuccess(channel, messagingVersion); } - public Retry retry() { return (Retry) this; } + public Retry retry() { + if (this instanceof Retry retry) + return retry; + return null; + } static Result retry(int withMessagingVersion) { return new Retry<>(withMessagingVersion); } - public Incompatible incompatible() { return (Incompatible) this; } + public Incompatible incompatible() + { + if (this instanceof Incompatible incompatible) + return incompatible; + return null; + } static Result incompatible(int closestSupportedVersion, int maxMessagingVersion) { return new Incompatible(closestSupportedVersion, maxMessagingVersion); } } diff --git a/test/unit/org/apache/cassandra/net/StreamingTest.java b/test/unit/org/apache/cassandra/net/StreamingTest.java new file mode 100644 index 000000000000..82b3ca3ba358 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/StreamingTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import com.google.common.net.InetAddresses; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.Future; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.Builder; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.gms.GossipDigestSyn; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.security.DefaultSslContextFactory; +import org.apache.cassandra.transport.TlsTestUtils; + +import static org.apache.cassandra.net.OutboundConnectionInitiator.Result; +import static org.apache.cassandra.net.OutboundConnectionInitiator.SslFallbackConnectionType; +import static org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming; +import static org.apache.cassandra.net.MessagingService.current_version; +import static org.apache.cassandra.net.MessagingService.minimum_version; +import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.NOT_REQUIRED; +import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.REQUIRED; +import static org.apache.cassandra.tcm.ClusterMetadata.EMPTY_METADATA_IDENTIFIER; +public class StreamingTest +{ + private static final SocketFactory factory = new SocketFactory(); + static final InetAddressAndPort TO_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 7012); + static final InetAddressAndPort FROM_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 7012); + private volatile Throwable handshakeEx; + @BeforeClass + public static void startup() + { + DatabaseDescriptor.daemonInitialization(); + CommitLog.instance.start(); + } + + @AfterClass + public static void cleanup() throws InterruptedException + { + factory.shutdownNow(); + } + + @Before + public void setup() + { + handshakeEx = null; + } + + private Future> streamingConnect(AcceptVersions acceptOutbound, AcceptVersions acceptInbound) throws ExecutionException, InterruptedException + { + InboundSockets inbound = new InboundSockets(new InboundConnectionSettings().withAcceptMessaging(acceptInbound)); + try + { + inbound.open(); + InetAddressAndPort endpoint = inbound.sockets().stream().map(s -> s.settings.bindAddress).findFirst().get(); + EventLoop eventLoop = factory.defaultGroup().next(); + Future> result = initiateStreaming(eventLoop, + new OutboundConnectionSettings(endpoint) + .withAcceptVersions(acceptOutbound) + .withDefaults(ConnectionCategory.STREAMING), + SslFallbackConnectionType.SERVER_CONFIG + ); + result.awaitUninterruptibly(); + return result; + } + finally + { + inbound.close().await(1L, TimeUnit.SECONDS); + } + } + + @Test + public void testIncompatibleVersion() throws InterruptedException, ExecutionException + { + Future> result = streamingConnect(new AcceptVersions(current_version + 1, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); + if (result.isSuccess()) { + Result nowResult = result.getNow(); + Assert.assertNull(nowResult.success()); + Assert.assertEquals(Result.Outcome.INCOMPATIBLE, nowResult.outcome); + Assert.assertEquals(current_version, nowResult.incompatible().closestSupportedVersion); + Assert.assertEquals(current_version, nowResult.incompatible().maxMessagingVersion); + + } else { + Assert.assertTrue(false); + } + } + + @Test + public void testCompatibleVersion() throws InterruptedException, ExecutionException + { + Future> result = streamingConnect(new AcceptVersions(MessagingService.minimum_version, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); + if (result.isSuccess()) { + Result nowResult = result.getNow(); + Assert.assertNotNull(nowResult.success().channel); + Assert.assertEquals(Result.Outcome.SUCCESS, nowResult.outcome); + Assert.assertEquals(current_version, nowResult.success().messagingVersion); + } else { + Assert.assertTrue(false); + } + } + + private ServerEncryptionOptions getServerEncryptionOptions(SslFallbackConnectionType sslConnectionType, boolean optional) + { + Builder serverEncryptionOptionsBuilder = new Builder(); + + serverEncryptionOptionsBuilder.withOutboundKeystore(TlsTestUtils.SERVER_OUTBOUND_KEYSTORE_PATH) + .withOutboundKeystorePassword(TlsTestUtils.SERVER_OUTBOUND_KEYSTORE_PASSWORD) + .withOptional(optional) + .withKeyStore(TlsTestUtils.SERVER_KEYSTORE_PATH) + .withKeyStorePassword(TlsTestUtils.SERVER_KEYSTORE_PASSWORD) + .withTrustStore(TlsTestUtils.SERVER_TRUSTSTORE_PATH).withTrustStorePassword(TlsTestUtils.SERVER_TRUSTSTORE_PASSWORD) + .withSslContextFactory((new ParameterizedClass(DefaultSslContextFactory.class.getName(), + new HashMap<>()))); + + if (sslConnectionType == SslFallbackConnectionType.MTLS) + { + serverEncryptionOptionsBuilder.withInternodeEncryption(ServerEncryptionOptions.InternodeEncryption.all) + .withRequireClientAuth(REQUIRED); + } + else if (sslConnectionType == SslFallbackConnectionType.SSL) + { + serverEncryptionOptionsBuilder.withInternodeEncryption(ServerEncryptionOptions.InternodeEncryption.all) + .withRequireClientAuth(NOT_REQUIRED); + } + return serverEncryptionOptionsBuilder.build(); + } + + private OutboundConnection initiateOutbound(InetAddressAndPort endpoint, SslFallbackConnectionType connectionType, boolean optional) throws ClosedChannelException + { + final OutboundConnectionSettings settings = new OutboundConnectionSettings(endpoint) + .withAcceptVersions(new AcceptVersions(minimum_version, current_version)) + .withDefaults(ConnectionCategory.MESSAGING) + .withEncryption(getServerEncryptionOptions(connectionType, optional)) + .withDebugCallbacks(new HandshakeAcknowledgeChecker(t -> handshakeEx = t)) + .withFrom(FROM_ADDR); + OutboundConnections outboundConnections = OutboundConnections.tryRegister(new ConcurrentHashMap<>(), TO_ADDR, settings); + GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", EMPTY_METADATA_IDENTIFIER, new ArrayList<>(0)); + Message message = Message.out(Verb.GOSSIP_DIGEST_SYN, syn); + OutboundConnection outboundConnection = outboundConnections.connectionFor(message); + outboundConnection.enqueue(message); + return outboundConnection; + } + private static class HandshakeAcknowledgeChecker implements OutboundDebugCallbacks + { + private final AtomicInteger acks = new AtomicInteger(0); + private final Consumer fail; + + private HandshakeAcknowledgeChecker(Consumer fail) + { + this.fail = fail; + } + + @Override + public void onSendSmallFrame(int messageCount, int payloadSizeInBytes) + { + } + + @Override + public void onSentSmallFrame(int messageCount, int payloadSizeInBytes) + { + } + + @Override + public void onFailedSmallFrame(int messageCount, int payloadSizeInBytes) + { + } + + @Override + public void onConnect(int messagingVersion, OutboundConnectionSettings settings) + { + if (acks.incrementAndGet() > 1) + fail.accept(new AssertionError("Handshake was acknowledged more than once")); + } + } +} \ No newline at end of file From 0dcd1f32d1068cbf7ff01a1d0776f30ec7f332ab Mon Sep 17 00:00:00 2001 From: vivekkoya <13vivekkoya@gmail.com> Date: Mon, 27 Oct 2025 18:05:42 -0700 Subject: [PATCH 2/5] JDK 8 compatible code Signed-off-by: vivekkoya <13vivekkoya@gmail.com> --- .../cassandra/net/OutboundConnection.java | 3 +- .../net/OutboundConnectionInitiator.java | 12 +++---- .../apache/cassandra/net/StreamingTest.java | 36 ++++++++----------- 3 files changed, 22 insertions(+), 29 deletions(-) diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 3113f462e0b4..8894e965ac9e 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -1127,7 +1127,8 @@ void onCompletedHandshake(Result result) // it is expected that close, if successful, has already cancelled us; so we do not need to worry about leaking connections assert !state.isClosed(); - if (result.success() instanceof MessagingSuccess success) { + if (result.success() instanceof MessagingSuccess) { + MessagingSuccess success = (MessagingSuccess) result.success(); debug.onConnect(success.messagingVersion, settings); state.disconnected().maintenance.cancel(false); diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java index 3c6114faec55..db0213c1f85b 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java @@ -526,24 +526,24 @@ private Result(Outcome outcome) boolean isSuccess() { return outcome == Outcome.SUCCESS; } public Success success() { - if (this instanceof Success success) - return success; + if (this instanceof Success) + return (Success) this; return null; } static MessagingSuccess messagingSuccess(Channel channel, int messagingVersion, FrameEncoder.PayloadAllocator allocator) { return new MessagingSuccess(channel, messagingVersion, allocator); } static StreamingSuccess streamingSuccess(Channel channel, int messagingVersion) { return new StreamingSuccess(channel, messagingVersion); } public Retry retry() { - if (this instanceof Retry retry) - return retry; + if (this instanceof Retry) + return (Retry) this; return null; } static Result retry(int withMessagingVersion) { return new Retry<>(withMessagingVersion); } public Incompatible incompatible() { - if (this instanceof Incompatible incompatible) - return incompatible; + if (this instanceof Incompatible) + return (Incompatible) this; return null; } static Result incompatible(int closestSupportedVersion, int maxMessagingVersion) { return new Incompatible(closestSupportedVersion, maxMessagingVersion); } diff --git a/test/unit/org/apache/cassandra/net/StreamingTest.java b/test/unit/org/apache/cassandra/net/StreamingTest.java index 82b3ca3ba358..e2d2a8527ae4 100644 --- a/test/unit/org/apache/cassandra/net/StreamingTest.java +++ b/test/unit/org/apache/cassandra/net/StreamingTest.java @@ -79,7 +79,7 @@ public void setup() handshakeEx = null; } - private Future> streamingConnect(AcceptVersions acceptOutbound, AcceptVersions acceptInbound) throws ExecutionException, InterruptedException + private Result streamingConnect(AcceptVersions acceptOutbound, AcceptVersions acceptInbound) throws ExecutionException, InterruptedException { InboundSockets inbound = new InboundSockets(new InboundConnectionSettings().withAcceptMessaging(acceptInbound)); try @@ -94,7 +94,9 @@ private Future> streamingConnect(AcceptVersions SslFallbackConnectionType.SERVER_CONFIG ); result.awaitUninterruptibly(); - return result; + Assert.assertTrue(result.isSuccess()); + + return result.getNow(); } finally { @@ -105,31 +107,21 @@ private Future> streamingConnect(AcceptVersions @Test public void testIncompatibleVersion() throws InterruptedException, ExecutionException { - Future> result = streamingConnect(new AcceptVersions(current_version + 1, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); - if (result.isSuccess()) { - Result nowResult = result.getNow(); - Assert.assertNull(nowResult.success()); - Assert.assertEquals(Result.Outcome.INCOMPATIBLE, nowResult.outcome); - Assert.assertEquals(current_version, nowResult.incompatible().closestSupportedVersion); - Assert.assertEquals(current_version, nowResult.incompatible().maxMessagingVersion); - - } else { - Assert.assertTrue(false); - } + Result nowResult = streamingConnect(new AcceptVersions(current_version + 1, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); + Assert.assertNull(nowResult.success()); + Assert.assertEquals(Result.Outcome.INCOMPATIBLE, nowResult.outcome); + Assert.assertEquals(current_version, nowResult.incompatible().closestSupportedVersion); + Assert.assertEquals(current_version, nowResult.incompatible().maxMessagingVersion); } @Test public void testCompatibleVersion() throws InterruptedException, ExecutionException { - Future> result = streamingConnect(new AcceptVersions(MessagingService.minimum_version, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); - if (result.isSuccess()) { - Result nowResult = result.getNow(); - Assert.assertNotNull(nowResult.success().channel); - Assert.assertEquals(Result.Outcome.SUCCESS, nowResult.outcome); - Assert.assertEquals(current_version, nowResult.success().messagingVersion); - } else { - Assert.assertTrue(false); - } + Result nowResult = streamingConnect(new AcceptVersions(MessagingService.minimum_version, current_version + 1), new AcceptVersions(minimum_version + 2, current_version + 3)); + Assert.assertNotNull(nowResult.success()); + Assert.assertNotNull(nowResult.success().channel); + Assert.assertEquals(Result.Outcome.SUCCESS, nowResult.outcome); + Assert.assertEquals(current_version, nowResult.success().messagingVersion); } private ServerEncryptionOptions getServerEncryptionOptions(SslFallbackConnectionType sslConnectionType, boolean optional) From 4e493dc4e56c55ad1faeeb004465bdadbe6567d3 Mon Sep 17 00:00:00 2001 From: vivekkoya <13vivekkoya@gmail.com> Date: Mon, 27 Oct 2025 20:16:27 -0700 Subject: [PATCH 3/5] Check outcomes prior to casting types --- .../apache/cassandra/net/OutboundConnectionInitiator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java index db0213c1f85b..218bd17f42d8 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java @@ -526,7 +526,7 @@ private Result(Outcome outcome) boolean isSuccess() { return outcome == Outcome.SUCCESS; } public Success success() { - if (this instanceof Success) + if (this.outcome == outcome.SUCCESS) return (Success) this; return null; } @@ -534,7 +534,7 @@ public Success success() { static StreamingSuccess streamingSuccess(Channel channel, int messagingVersion) { return new StreamingSuccess(channel, messagingVersion); } public Retry retry() { - if (this instanceof Retry) + if (this.outcome == outcome.RETRY) return (Retry) this; return null; } @@ -542,7 +542,7 @@ public Retry retry() { public Incompatible incompatible() { - if (this instanceof Incompatible) + if (this.outcome == outcome.INCOMPATIBLE) return (Incompatible) this; return null; } From 9586eadef35506d7d1f556946ff5e545cb54102b Mon Sep 17 00:00:00 2001 From: vivekkoya <13vivekkoya@gmail.com> Date: Sun, 2 Nov 2025 23:38:25 -0800 Subject: [PATCH 4/5] Remove instanceof for subtype of Success (MessagingSuccess) --- src/java/org/apache/cassandra/net/OutboundConnection.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 8894e965ac9e..3adb00b16aca 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -53,6 +53,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.net.OutboundConnectionInitiator.Result.MessagingSuccess; +import org.apache.cassandra.net.OutboundConnectionInitiator.Result.Success; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -1127,7 +1128,7 @@ void onCompletedHandshake(Result result) // it is expected that close, if successful, has already cancelled us; so we do not need to worry about leaking connections assert !state.isClosed(); - if (result.success() instanceof MessagingSuccess) { + if (result.success() != null) { MessagingSuccess success = (MessagingSuccess) result.success(); debug.onConnect(success.messagingVersion, settings); state.disconnected().maintenance.cancel(false); From 62ed898593fedda1366b048eb98f757a32367054 Mon Sep 17 00:00:00 2001 From: vivekkoya <13vivekkoya@gmail.com> Date: Mon, 3 Nov 2025 23:18:44 -0800 Subject: [PATCH 5/5] Remove instanceof for subtype of Success (MessagingSuccess) --- src/java/org/apache/cassandra/net/OutboundConnection.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 3adb00b16aca..aa63441acf95 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -53,7 +53,6 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.net.OutboundConnectionInitiator.Result.MessagingSuccess; -import org.apache.cassandra.net.OutboundConnectionInitiator.Result.Success; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector;