diff --git a/Sources/NIOCore/IO.swift b/Sources/NIOCore/IO.swift index 59d7cced5ec..61e25f677d1 100644 --- a/Sources/NIOCore/IO.swift +++ b/Sources/NIOCore/IO.swift @@ -62,7 +62,7 @@ public struct IOError: Swift.Error { .reason(self.failureDescription) } - private enum Error { + package enum Error { #if os(Windows) case windows(DWORD) case winsock(CInt) @@ -70,7 +70,7 @@ public struct IOError: Swift.Error { case errno(CInt) } - private let error: Error + package let error: Error /// The `errno` that was set for the operation. public var errnoCode: CInt { diff --git a/Sources/NIOHTTP1Server/main.swift b/Sources/NIOHTTP1Server/main.swift index ac1752e8d31..04812465e32 100644 --- a/Sources/NIOHTTP1Server/main.swift +++ b/Sources/NIOHTTP1Server/main.swift @@ -15,6 +15,9 @@ import NIOCore import NIOHTTP1 import NIOPosix +#if os(Windows) +import WinSDK +#endif extension String { func chopPrefix(_ prefix: String) -> String? { @@ -664,7 +667,11 @@ let channel = try { () -> Channel in case .unixDomainSocket(let path): return try socketBootstrap.bind(unixDomainSocketPath: path).wait() case .stdio: - return try pipeBootstrap.takingOwnershipOfDescriptors(input: STDIN_FILENO, output: STDOUT_FILENO).wait() + #if os(Windows) + return try pipeBootstrap.takingOwnershipOfDescriptors(input: Int32(bitPattern: STD_INPUT_HANDLE), output: Int32(bitPattern: STD_OUTPUT_HANDLE)).wait() + #else + return try pipeBootstrap.takingOwnershipOfDescriptors(input: STDIN_FILENO, output: STDOUT_FILENO).wait() + #endif } }() diff --git a/Sources/NIOPosix/BSDSocketAPIWindows.swift b/Sources/NIOPosix/BSDSocketAPIWindows.swift index b51aaca02bb..80e230c45a2 100644 --- a/Sources/NIOPosix/BSDSocketAPIWindows.swift +++ b/Sources/NIOPosix/BSDSocketAPIWindows.swift @@ -195,7 +195,11 @@ extension NIOBSDSocket { ) throws -> NIOBSDSocket.Handle? { let socket: NIOBSDSocket.Handle = WinSDK.accept(s, addr, addrlen) if socket == WinSDK.INVALID_SOCKET { - throw IOError(winsock: WSAGetLastError(), reason: "accept") + let lastError = WSAGetLastError() + if lastError == WSAEWOULDBLOCK { + return nil + } + throw IOError(winsock: lastError, reason: "accept") } return socket } @@ -226,7 +230,7 @@ extension NIOBSDSocket { ) throws -> Bool { if WinSDK.connect(s, name, namelen) == SOCKET_ERROR { let iResult = WSAGetLastError() - if iResult == WSAEWOULDBLOCK { return true } + if iResult == WSAEWOULDBLOCK { return false } throw IOError(winsock: WSAGetLastError(), reason: "connect") } return true diff --git a/Sources/NIOPosix/BaseSocketChannel.swift b/Sources/NIOPosix/BaseSocketChannel.swift index d928416451a..c772c0fd130 100644 --- a/Sources/NIOPosix/BaseSocketChannel.swift +++ b/Sources/NIOPosix/BaseSocketChannel.swift @@ -15,6 +15,9 @@ import Atomics import NIOConcurrencyHelpers import NIOCore +#if os(Windows) +import WinSDK +#endif private struct SocketChannelLifecycleManager { // MARK: Types @@ -1215,7 +1218,7 @@ class BaseSocketChannel: SelectableChannel, Chan /// - err: The `Error` which was thrown by `readFromSocket`. /// - Returns: `true` if the `Channel` should be closed, `false` otherwise. func shouldCloseOnReadError(_ err: Error) -> Bool { - true + return true } /// Handles an error reported by the selector. diff --git a/Sources/NIOPosix/SelectorGeneric.swift b/Sources/NIOPosix/SelectorGeneric.swift index b538e3f4f5d..8e3b41aa9fc 100644 --- a/Sources/NIOPosix/SelectorGeneric.swift +++ b/Sources/NIOPosix/SelectorGeneric.swift @@ -216,7 +216,15 @@ internal class Selector { @usableFromInline typealias EventType = WinSDK.pollfd @usableFromInline - var pollFDs = [WinSDK.pollfd]() + var pollFDs = [pollfd]() + @usableFromInline + var deregisteredFDs = [Bool]() + /// The read end of the wakeup socket pair. This is monitored in WSAPoll to allow waking up the event loop. + @usableFromInline + var wakeupReadSocket: NIOBSDSocket.Handle = NIOBSDSocket.invalidHandle + /// The write end of the wakeup socket pair. Writing to this socket wakes up the event loop. + @usableFromInline + var wakeupWriteSocket: NIOBSDSocket.Handle = NIOBSDSocket.invalidHandle #else #error("Unsupported platform, no suitable selector backend (we need kqueue or epoll support)") #endif diff --git a/Sources/NIOPosix/SelectorWSAPoll.swift b/Sources/NIOPosix/SelectorWSAPoll.swift index c02cf1d3bee..6f3ba39bfbe 100644 --- a/Sources/NIOPosix/SelectorWSAPoll.swift +++ b/Sources/NIOPosix/SelectorWSAPoll.swift @@ -14,6 +14,7 @@ #if os(Windows) import CNIOWindows +import NIOConcurrencyHelpers import NIOCore import WinSDK @@ -67,10 +68,74 @@ extension Selector: _SelectorBackendProtocol { func initialiseState0() throws { self.pollFDs.reserveCapacity(16) + self.deregisteredFDs.reserveCapacity(16) + + // Create a loopback socket pair for wakeup signaling. + // Since Windows doesn't support socketpair(), we create a pair of connected + // loopback UDP sockets to wake up the event loop. + let (readSocket, writeSocket) = try Self.createWakeupSocketPair() + self.wakeupReadSocket = readSocket + self.wakeupWriteSocket = writeSocket + + // Add the read end to pollFDs so WSAPoll will wake up when data is written to the write end + let wakeupPollFD = pollfd(fd: UInt64(readSocket), events: Int16(WinSDK.POLLRDNORM), revents: 0) + self.pollFDs.append(wakeupPollFD) + self.deregisteredFDs.append(false) + + self.lifecycleState = .open + } + + /// Creates a pair of connected loopback UDP sockets for wakeup signaling. + /// Returns (readSocket, writeSocket) tuple. + private static func createWakeupSocketPair() throws -> (NIOBSDSocket.Handle, NIOBSDSocket.Handle) { + // Create a UDP socket to receive wakeup signals + let readSocket = try NIOBSDSocket.socket(domain: .inet, type: .datagram, protocolSubtype: .default) + + do { + // Bind to loopback on an ephemeral port + var addr = sockaddr_in() + addr.sin_family = ADDRESS_FAMILY(AF_INET) + addr.sin_addr.S_un.S_addr = WinSDK.htonl(UInt32(bitPattern: INADDR_LOOPBACK)) + addr.sin_port = 0 // Let the system assign a port + + try withUnsafeBytes(of: &addr) { addrPtr in + let socketPointer = addrPtr.baseAddress!.assumingMemoryBound(to: sockaddr.self) + try NIOBSDSocket.bind(socket: readSocket, address: socketPointer, address_len: socklen_t(MemoryLayout.size)) + } + + // Get the assigned port + var boundAddr = sockaddr() + var size = Int32(MemoryLayout.size) + try NIOBSDSocket.getsockname(socket: readSocket, address: &boundAddr, address_len: &size) + + // Create the write socket + let writeSocket = try NIOBSDSocket.socket(domain: .inet, type: .datagram, protocolSubtype: .default) + + do { + // Connect the write socket to the read socket's address + guard try NIOBSDSocket.connect(socket: writeSocket, address: &boundAddr, address_len: size) else { + throw IOError(winsock: WSAGetLastError(), reason: "connect") + } + return (readSocket, writeSocket) + } catch { + _ = try? NIOBSDSocket.close(socket: writeSocket) + throw error + } + } catch { + _ = try? NIOBSDSocket.close(socket: readSocket) + throw error + } } func deinitAssertions0() { - // no global state. nothing to check + assert( + self.wakeupReadSocket == NIOBSDSocket.invalidHandle, + "wakeupReadSocket == \(self.wakeupReadSocket) in deinitAssertions0, forgot close?" + ) + assert( + self.wakeupWriteSocket == NIOBSDSocket.invalidHandle, + "wakeupWriteSocket == \(self.wakeupWriteSocket) in deinitAssertions0, forgot close?" + ) } @inlinable @@ -91,51 +156,65 @@ extension Selector: _SelectorBackendProtocol { Int32(clamping: timeAmount.nanoseconds / 1_000_000) } - // WSAPoll requires at least one pollFD structure. If we don't have any pending IO - // we should just sleep. By passing true as the second argument our el can be - // woken up by an APC (Asynchronous Procedure Call). - if self.pollFDs.isEmpty { - if time > 0 { - SleepEx(UInt32(time), true) - } else if time == -1 { - SleepEx(INFINITE, true) - } - } else { - let result = self.pollFDs.withUnsafeMutableBufferPointer { ptr in - WSAPoll(ptr.baseAddress!, UInt32(ptr.count), time) - } + precondition(!self.pollFDs.isEmpty, "pollFDs should never be empty here, since we need an eventFD for waking up on demand") + // We always have at least the wakeup socket in pollFDs + let result = self.pollFDs.withUnsafeMutableBufferPointer { ptr in + WSAPoll(ptr.baseAddress!, UInt32(ptr.count), time) + } - if result > 0 { - // something has happened - for i in self.pollFDs.indices { - let pollFD = self.pollFDs[i] - guard pollFD.revents != 0 else { - continue - } - // reset the revents - self.pollFDs[i].revents = 0 - let fd = pollFD.fd + if result > 0 { + // something has happened + for i in self.pollFDs.indices { + let pollFD = self.pollFDs[i] + guard pollFD.revents != 0 else { + continue + } + // reset the revents + self.pollFDs[i].revents = 0 + let fd = pollFD.fd - // If the registration is not in the Map anymore we deregistered it during the processing of whenReady(...). In this case just skip it. - guard let registration = registrations[Int(fd)] else { - continue + // Check if this is the wakeup socket + if NIOBSDSocket.Handle(fd) == self.wakeupReadSocket { + // Drain the wakeup socket by reading the data that was sent + var buffer: UInt8 = 0 + _ = withUnsafeMutablePointer(to: &buffer) { ptr in + WinSDK.recv(self.wakeupReadSocket, ptr, 1, 0) } + continue + } - var selectorEvent = SelectorEventSet(revents: pollFD.revents) - // in any case we only want what the user is currently registered for & what we got - selectorEvent = selectorEvent.intersection(registration.interested) + // If the registration is not in the Map anymore we deregistered it during the processing of whenReady(...). In this case just skip it. + guard let registration = registrations[Int(fd)] else { + continue + } - guard selectorEvent != ._none else { - continue - } + var selectorEvent = SelectorEventSet(revents: pollFD.revents) + // in any case we only want what the user is currently registered for & what we got + selectorEvent = selectorEvent.intersection(registration.interested) - try body((SelectorEvent(io: selectorEvent, registration: registration))) + guard selectorEvent != ._none else { + continue + } + + try body((SelectorEvent(io: selectorEvent, registration: registration))) + } + + // now clean up any deregistered fds + // In reverse order so we don't have to copy elements out of the array + // If we do in in normal order, we'll have to shift all elements after the removed one + for i in self.deregisteredFDs.indices.reversed() { + if self.deregisteredFDs[i] { + // remove this one + let fd = self.pollFDs[i].fd + self.pollFDs.remove(at: i) + self.deregisteredFDs.remove(at: i) + self.registrations.removeValue(forKey: Int(fd)) } - } else if result == 0 { - // nothing has happened - } else if result == WinSDK.SOCKET_ERROR { - throw IOError(winsock: WSAGetLastError(), reason: "WSAPoll") } + } else if result == 0 { + // nothing has happened + } else if result == WinSDK.SOCKET_ERROR { + throw IOError(winsock: WSAGetLastError(), reason: "WSAPoll") } } @@ -149,6 +228,7 @@ extension Selector: _SelectorBackendProtocol { // that will allow O(1) access here. let poll = pollfd(fd: UInt64(fileDescriptor), events: interested.wsaPollEvent, revents: 0) self.pollFDs.append(poll) + self.deregisteredFDs.append(false) } func reregister0( @@ -158,7 +238,9 @@ extension Selector: _SelectorBackendProtocol { newInterested: SelectorEventSet, registrationID: SelectorRegistrationID ) throws { - fatalError("TODO: Unimplemented") + if let index = self.pollFDs.firstIndex(where: { $0.fd == UInt64(fileDescriptor) }) { + self.pollFDs[index].events = newInterested.wsaPollEvent + } } func deregister0( @@ -167,29 +249,40 @@ extension Selector: _SelectorBackendProtocol { oldInterested: SelectorEventSet, registrationID: SelectorRegistrationID ) throws { - fatalError("TODO: Unimplemented") + if let index = self.pollFDs.firstIndex(where: { $0.fd == UInt64(fileDescriptor) }) { + self.deregisteredFDs[index] = true + } } func wakeup0() throws { - // will be called from a different thread - let result = try self.myThread.withHandleUnderLock { handle in - QueueUserAPC(wakeupTarget, handle, 0) - } - if result == 0 { - let errorCode = GetLastError() - if let errorMsg = Windows.makeErrorMessageFromCode(errorCode) { - throw IOError(errnoCode: Int32(errorCode), reason: errorMsg) + // Will be called from a different thread. + // Write a single byte to the wakeup socket to wake up the event loop. + try self.externalSelectorFDLock.withLock { + guard self.wakeupWriteSocket != NIOBSDSocket.invalidHandle else { + throw EventLoopError.shutdown + } + var byte: UInt8 = 0 + let result = withUnsafePointer(to: &byte) { ptr in + WinSDK.send(self.wakeupWriteSocket, ptr, 1, 0) + } + if result == SOCKET_ERROR { + throw IOError(winsock: WSAGetLastError(), reason: "send (wakeup)") } } } func close0() throws { + // Close the wakeup sockets + if self.wakeupReadSocket != NIOBSDSocket.invalidHandle { + try? NIOBSDSocket.close(socket: self.wakeupReadSocket) + self.wakeupReadSocket = NIOBSDSocket.invalidHandle + } + if self.wakeupWriteSocket != NIOBSDSocket.invalidHandle { + try? NIOBSDSocket.close(socket: self.wakeupWriteSocket) + self.wakeupWriteSocket = NIOBSDSocket.invalidHandle + } self.pollFDs.removeAll() + self.deregisteredFDs.removeAll() } } - -private func wakeupTarget(_ ptr: UInt64) { - // This is the target of our wakeup call. - // We don't really need to do anything here. We just need any target -} #endif diff --git a/Sources/NIOPosix/SocketChannel.swift b/Sources/NIOPosix/SocketChannel.swift index 504c2ad8347..10346bfe38b 100644 --- a/Sources/NIOPosix/SocketChannel.swift +++ b/Sources/NIOPosix/SocketChannel.swift @@ -383,12 +383,12 @@ final class ServerSocketChannel: BaseSocketChannel, @unchecked Sen } guard let err = err as? IOError else { return true } - switch err.errnoCode { - case ECONNABORTED, - EMFILE, - ENFILE, - ENOBUFS, - ENOMEM: + switch err.error { + case .errno(ECONNABORTED), + .errno(EMFILE), + .errno(ENFILE), + .errno(ENOBUFS), + .errno(ENOMEM): // These are errors we may be able to recover from. The user may just want to stop accepting connections for example // or provide some other means of back-pressure. This could be achieved by a custom ChannelDuplexHandler. return false @@ -856,14 +856,22 @@ final class DatagramChannel: BaseSocketChannel, @unchecked Sendable { private func shouldCloseOnErrnoCode(_ errnoCode: CInt) -> Bool { switch errnoCode { + case ECONNREFUSED, ENOMEM: + // These are errors we may be able to recover from. + return false + default: + return true + } + } + + private func shouldCloseOnError(_ error: IOError.Error) -> Bool { + switch error { // ECONNREFUSED can happen on linux if the previous sendto(...) failed. // See also: // - https://bugzilla.redhat.com/show_bug.cgi?id=1375 // - https://lists.gt.net/linux/kernel/39575 - case ECONNREFUSED, - ENOMEM: - // These are errors we may be able to recover from. - return false + case .errno(let code): + return self.shouldCloseOnErrnoCode(code) default: return true } @@ -871,7 +879,7 @@ final class DatagramChannel: BaseSocketChannel, @unchecked Sendable { override func shouldCloseOnReadError(_ err: Error) -> Bool { guard let err = err as? IOError else { return true } - return self.shouldCloseOnErrnoCode(err.errnoCode) + return self.shouldCloseOnError(err.error) } override func error() -> ErrorResult { diff --git a/Sources/NIOPosix/System.swift b/Sources/NIOPosix/System.swift index 576f75c6746..b6561441e3b 100644 --- a/Sources/NIOPosix/System.swift +++ b/Sources/NIOPosix/System.swift @@ -482,7 +482,8 @@ internal enum Posix: Sendable { #else @usableFromInline static var UIO_MAXIOV: Int { - fatalError("unsupported OS") + // TODO: This is a placeholder value. Find the correct one for Windows. + return 64 } @usableFromInline static var SHUT_RD: Int { diff --git a/Sources/NIOPosix/Thread.swift b/Sources/NIOPosix/Thread.swift index b730aba1b25..77640fca9cb 100644 --- a/Sources/NIOPosix/Thread.swift +++ b/Sources/NIOPosix/Thread.swift @@ -92,18 +92,14 @@ final class NIOThread: Sendable { static var currentThreadName: String? { #if os(Windows) - ThreadOpsSystem.threadName(.init(GetCurrentThread())) + ThreadOpsSystem.threadName(.init(handle: GetCurrentThread())) #else ThreadOpsSystem.threadName(.init(handle: pthread_self())) #endif } static var currentThreadID: UInt { - #if os(Windows) - UInt(bitPattern: .init(bitPattern: ThreadOpsSystem.currentThread)) - #else UInt(bitPattern: .init(bitPattern: ThreadOpsSystem.currentThread.handle)) - #endif } @discardableResult diff --git a/Sources/NIOPosix/ThreadWindows.swift b/Sources/NIOPosix/ThreadWindows.swift index 4da8f48b01c..fcb2d4bf4b2 100644 --- a/Sources/NIOPosix/ThreadWindows.swift +++ b/Sources/NIOPosix/ThreadWindows.swift @@ -18,13 +18,15 @@ import WinSDK typealias ThreadOpsSystem = ThreadOpsWindows enum ThreadOpsWindows: ThreadOps { - typealias ThreadHandle = HANDLE + struct ThreadHandle: @unchecked Sendable { + let handle: HANDLE + } typealias ThreadSpecificKey = DWORD typealias ThreadSpecificKeyDestructor = @convention(c) (UnsafeMutableRawPointer?) -> Void static func threadName(_ thread: ThreadOpsSystem.ThreadHandle) -> String? { var pszBuffer: PWSTR? - GetThreadDescription(thread, &pszBuffer) + GetThreadDescription(thread.handle, &pszBuffer) guard let buffer = pszBuffer else { return nil } let string: String = String(decodingCString: buffer, as: UTF16.self) LocalFree(buffer) @@ -41,11 +43,27 @@ enum ThreadOpsWindows: ThreadOps { let routine: @convention(c) (UnsafeMutableRawPointer?) -> CUnsignedInt = { let boxed = Unmanaged.fromOpaque($0!).takeRetainedValue() let (body, name) = (boxed.value.body, boxed.value.name) - let hThread: ThreadOpsSystem.ThreadHandle = GetCurrentThread() + + // Get a real thread handle instead of pseudo-handle + var realHandle: HANDLE? = nil + let success = DuplicateHandle( + GetCurrentProcess(), // Source process + GetCurrentThread(), // Source handle (pseudo-handle) + GetCurrentProcess(), // Target process + &realHandle, // Target handle (real handle) + 0, // Desired access (0 = same as source) + false, // Inherit handle + DWORD(DUPLICATE_SAME_ACCESS) // Options + ) + + guard success, let realHandle else { + fatalError("DuplicateHandle failed: \(GetLastError())") + } + let hThread = ThreadOpsSystem.ThreadHandle(handle: realHandle) if let name = name { _ = name.withCString(encodedAs: UTF16.self) { - SetThreadDescription(hThread, $0) + SetThreadDescription(hThread.handle, $0) } } @@ -58,15 +76,28 @@ enum ThreadOpsWindows: ThreadOps { } static func isCurrentThread(_ thread: ThreadOpsSystem.ThreadHandle) -> Bool { - CompareObjectHandles(thread, GetCurrentThread()) + CompareObjectHandles(thread.handle, GetCurrentThread()) } static var currentThread: ThreadOpsSystem.ThreadHandle { - GetCurrentThread() + var realHandle: HANDLE? = nil + let success = DuplicateHandle( + GetCurrentProcess(), + GetCurrentThread(), + GetCurrentProcess(), + &realHandle, + 0, + false, + DWORD(DUPLICATE_SAME_ACCESS) + ) + guard success, let realHandle else { + fatalError("DuplicateHandle failed: \(GetLastError())") + } + return ThreadHandle(handle: realHandle) } static func joinThread(_ thread: ThreadOpsSystem.ThreadHandle) { - let dwResult: DWORD = WaitForSingleObject(thread, INFINITE) + let dwResult: DWORD = WaitForSingleObject(thread.handle, INFINITE) assert(dwResult == WAIT_OBJECT_0, "WaitForSingleObject: \(GetLastError())") } @@ -88,7 +119,7 @@ enum ThreadOpsWindows: ThreadOps { } static func compareThreads(_ lhs: ThreadOpsSystem.ThreadHandle, _ rhs: ThreadOpsSystem.ThreadHandle) -> Bool { - CompareObjectHandles(lhs, rhs) + CompareObjectHandles(lhs.handle, rhs.handle) } }