| 
 | 1 | +/*  | 
 | 2 | + * Licensed to the Apache Software Foundation (ASF) under one or more  | 
 | 3 | + * contributor license agreements.  See the NOTICE file distributed with  | 
 | 4 | + * this work for additional information regarding copyright ownership.  | 
 | 5 | + * The ASF licenses this file to You under the Apache License, Version 2.0  | 
 | 6 | + * (the "License"); you may not use this file except in compliance with  | 
 | 7 | + * the License.  You may obtain a copy of the License at  | 
 | 8 | + *  | 
 | 9 | + *    http://www.apache.org/licenses/LICENSE-2.0  | 
 | 10 | + *  | 
 | 11 | + * Unless required by applicable law or agreed to in writing, software  | 
 | 12 | + * distributed under the License is distributed on an "AS IS" BASIS,  | 
 | 13 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
 | 14 | + * See the License for the specific language governing permissions and  | 
 | 15 | + * limitations under the License.  | 
 | 16 | + */  | 
 | 17 | + | 
 | 18 | +package org.apache.spark.api.python  | 
 | 19 | + | 
 | 20 | +import java.io.{BufferedReader, InputStream, InputStreamReader}  | 
 | 21 | +import java.nio.ByteBuffer  | 
 | 22 | +import java.nio.charset.StandardCharsets  | 
 | 23 | +import java.util.concurrent.ConcurrentHashMap  | 
 | 24 | +import java.util.concurrent.atomic.AtomicLong  | 
 | 25 | + | 
 | 26 | +import scala.jdk.CollectionConverters._  | 
 | 27 | + | 
 | 28 | +import org.apache.spark.SparkEnv  | 
 | 29 | +import org.apache.spark.internal.Logging  | 
 | 30 | +import org.apache.spark.storage.{PythonWorkerLogBlockIdGenerator, PythonWorkerLogLine, RollingLogWriter}  | 
 | 31 | + | 
 | 32 | +/**  | 
 | 33 | + * Manages Python UDF log capture and routing to per-worker log writers.  | 
 | 34 | + *  | 
 | 35 | + * This class handles the parsing of Python worker output streams and routes  | 
 | 36 | + * log messages to appropriate rolling log writers based on worker PIDs.  | 
 | 37 | + * Works for both daemon and non-daemon modes.  | 
 | 38 | + */  | 
 | 39 | +private[python] class PythonWorkerLogCapture(  | 
 | 40 | +    sessionId: String,  | 
 | 41 | +    logMarker: String = "PYTHON_WORKER_LOGGING") extends Logging {  | 
 | 42 | + | 
 | 43 | +  // Map to track per-worker log writers: workerId(PID) -> (writer, sequenceId)  | 
 | 44 | +  private val workerLogWriters = new ConcurrentHashMap[String, (RollingLogWriter, AtomicLong)]()  | 
 | 45 | + | 
 | 46 | +  /**  | 
 | 47 | +   * Creates an InputStream wrapper that captures Python UDF logs from the given stream.  | 
 | 48 | +   *  | 
 | 49 | +   * @param inputStream The input stream to wrap (typically daemon stdout or worker stdout)  | 
 | 50 | +   * @return A wrapped InputStream that captures and routes log messages  | 
 | 51 | +   */  | 
 | 52 | +  def wrapInputStream(inputStream: InputStream): InputStream = {  | 
 | 53 | +    new CaptureWorkerLogsInputStream(inputStream)  | 
 | 54 | +  }  | 
 | 55 | + | 
 | 56 | +  /**  | 
 | 57 | +   * Removes and closes the log writer for a specific worker.  | 
 | 58 | +   *  | 
 | 59 | +   * @param workerId The worker ID (typically PID as string)  | 
 | 60 | +   */  | 
 | 61 | +  def removeAndCloseWorkerLogWriter(workerId: String): Unit = {  | 
 | 62 | +    Option(workerLogWriters.remove(workerId)).foreach { case (writer, _) =>  | 
 | 63 | +      try {  | 
 | 64 | +        writer.close()  | 
 | 65 | +      } catch {  | 
 | 66 | +        case e: Exception =>  | 
 | 67 | +          logWarning(s"Failed to close log writer for worker $workerId", e)  | 
 | 68 | +      }  | 
 | 69 | +    }  | 
 | 70 | +  }  | 
 | 71 | + | 
 | 72 | +  /**  | 
 | 73 | +   * Closes all active worker log writers.  | 
 | 74 | +   */  | 
 | 75 | +  def closeAllWriters(): Unit = {  | 
 | 76 | +    workerLogWriters.values().asScala.foreach { case (writer, _) =>  | 
 | 77 | +      try {  | 
 | 78 | +        writer.close()  | 
 | 79 | +      } catch {  | 
 | 80 | +        case e: Exception =>  | 
 | 81 | +          logWarning("Failed to close log writer", e)  | 
 | 82 | +      }  | 
 | 83 | +    }  | 
 | 84 | +    workerLogWriters.clear()  | 
 | 85 | +  }  | 
 | 86 | + | 
 | 87 | +  /**  | 
 | 88 | +   * Gets or creates a log writer for the specified worker.  | 
 | 89 | +   *  | 
 | 90 | +   * @param workerId Unique identifier for the worker (typically PID)  | 
 | 91 | +   * @return Tuple of (RollingLogWriter, AtomicLong sequence counter)  | 
 | 92 | +   */  | 
 | 93 | +  private def getOrCreateLogWriter(workerId: String): (RollingLogWriter, AtomicLong) = {  | 
 | 94 | +    workerLogWriters.computeIfAbsent(workerId, _ => {  | 
 | 95 | +      val logWriter = SparkEnv.get.blockManager.getRollingLogWriter(  | 
 | 96 | +        new PythonWorkerLogBlockIdGenerator(sessionId, workerId)  | 
 | 97 | +      )  | 
 | 98 | +      (logWriter, new AtomicLong())  | 
 | 99 | +    })  | 
 | 100 | +  }  | 
 | 101 | + | 
 | 102 | +  /**  | 
 | 103 | +   * Processes a log line from a Python worker.  | 
 | 104 | +   *  | 
 | 105 | +   * @param line The complete line containing the log marker and JSON  | 
 | 106 | +   * @return The prefix (non-log content) that should be passed through  | 
 | 107 | +   */  | 
 | 108 | +  private def processLogLine(line: String): String = {  | 
 | 109 | +    val markerIndex = line.indexOf(s"$logMarker:")  | 
 | 110 | +    if (markerIndex >= 0) {  | 
 | 111 | +      val prefix = line.substring(0, markerIndex)  | 
 | 112 | +      val markerAndJson = line.substring(markerIndex)  | 
 | 113 | + | 
 | 114 | +      // Parse: "PYTHON_UDF_LOGGING:12345:{json}"  | 
 | 115 | +      val parts = markerAndJson.split(":", 3)  | 
 | 116 | +      if (parts.length >= 3) {  | 
 | 117 | +        val workerId = parts(1) // This is the PID from Python worker  | 
 | 118 | +        val json = parts(2)  | 
 | 119 | + | 
 | 120 | +        try {  | 
 | 121 | +          if (json.isEmpty) {  | 
 | 122 | +            removeAndCloseWorkerLogWriter(workerId)  | 
 | 123 | +          } else {  | 
 | 124 | +            val (writer, seqId) = getOrCreateLogWriter(workerId)  | 
 | 125 | +            writer.writeLog(  | 
 | 126 | +              PythonWorkerLogLine(System.currentTimeMillis(), seqId.getAndIncrement(), json)  | 
 | 127 | +            )  | 
 | 128 | +          }  | 
 | 129 | +        } catch {  | 
 | 130 | +          case e: Exception =>  | 
 | 131 | +            logWarning(s"Failed to write log for worker $workerId", e)  | 
 | 132 | +        }  | 
 | 133 | +      }  | 
 | 134 | +      prefix  | 
 | 135 | +    } else {  | 
 | 136 | +      line + System.lineSeparator()  | 
 | 137 | +    }  | 
 | 138 | +  }  | 
 | 139 | + | 
 | 140 | +  /**  | 
 | 141 | +   * InputStream wrapper that captures and processes Python UDF logs.  | 
 | 142 | +   */  | 
 | 143 | +  private class CaptureWorkerLogsInputStream(in: InputStream) extends InputStream {  | 
 | 144 | + | 
 | 145 | +    private[this] val reader = new BufferedReader(  | 
 | 146 | +      new InputStreamReader(in, StandardCharsets.ISO_8859_1))  | 
 | 147 | +    private[this] val temp = new Array[Byte](1)  | 
 | 148 | +    private[this] var buffer = ByteBuffer.allocate(0)  | 
 | 149 | + | 
 | 150 | +    override def read(): Int = {  | 
 | 151 | +      val n = read(temp)  | 
 | 152 | +      if (n <= 0) {  | 
 | 153 | +        -1  | 
 | 154 | +      } else {  | 
 | 155 | +        // Signed byte to unsigned integer  | 
 | 156 | +        temp(0) & 0xff  | 
 | 157 | +      }  | 
 | 158 | +    }  | 
 | 159 | + | 
 | 160 | +    override def read(b: Array[Byte], off: Int, len: Int): Int = {  | 
 | 161 | +      if (buffer.hasRemaining) {  | 
 | 162 | +        val buf = ByteBuffer.wrap(b, off, len)  | 
 | 163 | +        val remaining = Math.min(buffer.remaining(), buf.remaining())  | 
 | 164 | +        buf.put(buf.position(), buffer, buffer.position(), remaining)  | 
 | 165 | +        buffer.position(buffer.position() + remaining)  | 
 | 166 | +        remaining  | 
 | 167 | +      } else {  | 
 | 168 | +        val line = reader.readLine()  | 
 | 169 | +        if (line == null) {  | 
 | 170 | +          closeAllWriters()  | 
 | 171 | +          -1  | 
 | 172 | +        } else {  | 
 | 173 | +          val processedContent = if (line.contains(s"$logMarker:")) {  | 
 | 174 | +            processLogLine(line)  | 
 | 175 | +          } else {  | 
 | 176 | +            line + System.lineSeparator()  | 
 | 177 | +          }  | 
 | 178 | + | 
 | 179 | +          buffer = ByteBuffer.wrap(processedContent.getBytes(StandardCharsets.ISO_8859_1))  | 
 | 180 | +          read(b, off, len)  | 
 | 181 | +        }  | 
 | 182 | +      }  | 
 | 183 | +    }  | 
 | 184 | + | 
 | 185 | +    override def close(): Unit = {  | 
 | 186 | +      try {  | 
 | 187 | +        reader.close()  | 
 | 188 | +      } finally {  | 
 | 189 | +        closeAllWriters()  | 
 | 190 | +      }  | 
 | 191 | +    }  | 
 | 192 | +  }  | 
 | 193 | +}  | 
0 commit comments