-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[DO-NOT-MERGE] Prototype: runtime profiling of Python workers #52679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6b9e9ae to
39fb9dc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really interesting, did a start of a read through but I'll wait for a more complete description so I understand the in/out of scope a bit better.
| } | ||
| // allow the user to set the batch size for the BatchedSerializer on UDFs | ||
| envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString) | ||
| envVars.put("PYSPARK_RUNTIME_PROFILE", true.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
obviously make configurable later
| case class PythonWorker(channel: SocketChannel) { | ||
| case class PythonWorker( | ||
| channel: SocketChannel, | ||
| extraChannel: Option[SocketChannel] = None) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call this profile data channel or something?
| time.sleep(1) | ||
|
|
||
|
|
||
| def main(infile, outfile): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename to outputs
| pickled = pickle.dumps(stats) | ||
| write_with_length(pickled, outfile) | ||
| outfile.flush() | ||
| time.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know yappi says it's fast but 1 second busy loop seems maybe overkill or should be configurable?
| for thread in yappi.get_thread_stats(): | ||
| data = list(yappi.get_func_stats(ctx_id=thread.id)) | ||
| stats.extend([{str(k): str(v) for k, v in d.items()} for d in data]) | ||
| pickled = pickle.dumps(stats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While pickle? Would JSON maybe make more sense so we can interpert more easily in say the Spark UI in the future?
|
Let me make a new PR when it's ready |
What changes were proposed in this pull request?
TBD
Why are the changes needed?
TBD
Does this PR introduce any user-facing change?
TBD
How was this patch tested?
Screen.Recording.2025-10-24.at.12.18.24.PM.mov
Was this patch authored or co-authored using generative AI tooling?
No.