diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java index 029808107..fd8784c4e 100644 --- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.LongConsumer; import java.util.logging.Level; import java.util.stream.Collectors; @@ -196,8 +197,8 @@ public abstract class AbstractSession extends SessionHelper { protected int decoderState; protected int decoderLength; protected SshException discarding; - protected final Object encodeLock = new Object(); - protected final Object decodeLock = new Object(); + protected final ReentrantLock encodeLock = new ReentrantLock(); + protected final ReentrantLock decodeLock = new ReentrantLock(); protected final Object requestLock = new Object(); /** @@ -501,7 +502,8 @@ public MacInformation getMacInformation(boolean incoming) { * @throws Exception if an error occurs while decoding or handling the data */ public void messageReceived(Readable buffer) throws Exception { - synchronized (decodeLock) { + decodeLock.lock(); + try { decoderBuffer.putBuffer(buffer); // One of those properties will be set by the constructor and the other // one should be set by the readIdentification method @@ -513,6 +515,8 @@ public void messageReceived(Readable buffer) throws Exception { } } decode(); + } finally { + decodeLock.unlock(); } } @@ -718,13 +722,16 @@ protected IoWriteFuture sendNewKeys() throws Exception { prepareNewKeys(); Buffer buffer = createBuffer(SshConstants.SSH_MSG_NEWKEYS, Byte.SIZE); IoWriteFuture future; - synchronized (encodeLock) { + encodeLock.lock(); + try { // writePacket() would also work since it would never try to queue the packet, and would never try to // initiate a new KEX, and thus would never try to get the kexLock monitor. If it did, we might get a // deadlock due to lock inversion. It seems safer to push this out directly, though. future = doWritePacket(buffer); // Use the new settings from now on for any outgoing packet setOutputEncoding(); + } finally { + encodeLock.unlock(); } kexHandler.updateState(() -> kexState.set(KexState.KEYS)); @@ -1169,11 +1176,14 @@ protected IoWriteFuture doWritePacket(Buffer buffer) throws IOException { // Synchronize all write requests as needed by the encoding algorithm // and also queue the write request in this synchronized block to ensure // packets are sent in the correct order - synchronized (encodeLock) { + encodeLock.lock(); + try { Buffer packet = resolveOutputPacket(buffer); IoSession networkSession = getIoSession(); IoWriteFuture future = networkSession.writeBuffer(packet); return future; + } finally { + encodeLock.unlock(); } } diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java index 25108ce7e..54c465ebb 100644 --- a/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java +++ b/sshd-core/src/main/java/org/apache/sshd/server/session/AbstractServerSession.java @@ -317,7 +317,8 @@ public IoWriteFuture signalAuthenticationSuccess( Buffer response = createBuffer(SshConstants.SSH_MSG_USERAUTH_SUCCESS, Byte.SIZE); IoWriteFuture future; IoSession networkSession = getIoSession(); - synchronized (encodeLock) { + encodeLock.lock(); + try { Buffer packet = resolveOutputPacket(response); setUsername(username); @@ -327,6 +328,8 @@ public IoWriteFuture signalAuthenticationSuccess( // Now we can inform the peer that authentication is successful future = networkSession.writeBuffer(packet); + } finally { + encodeLock.unlock(); } resetIdleTimeout();