Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
128 changes: 128 additions & 0 deletions wmla-samples/wmla4.0-cpd4.7/models/pytorch-model_hpo/DataParallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
"""
Multi-GPU Examples
==================

Data Parallelism is when we split the mini-batch of samples into
multiple smaller mini-batches and run the computation for each of the
smaller mini-batches in parallel.

Data Parallelism is implemented using ``torch.nn.DataParallel``.
One can wrap a Module in ``DataParallel`` and it will be parallelized
over multiple GPUs in the batch dimension.

DataParallel
-------------
"""
import torch
import torch.nn as nn


class DataParallelModel(nn.Module):

def __init__(self):
super().__init__()
self.block1 = nn.Linear(10, 20)

# wrap block2 in DataParallel
self.block2 = nn.Linear(20, 20)
self.block2 = nn.DataParallel(self.block2)

self.block3 = nn.Linear(20, 20)

def forward(self, x):
x = self.block1(x)
x = self.block2(x)
x = self.block3(x)
return x

########################################################################
# The code does not need to be changed in CPU-mode.
#
# The documentation for DataParallel can be found
# `here <https://pytorch.org/docs/nn.html#dataparallel>`_.
#
# **Primitives on which DataParallel is implemented upon:**
#
#
# In general, pytorch’s `nn.parallel` primitives can be used independently.
# We have implemented simple MPI-like primitives:
#
# - replicate: replicate a Module on multiple devices
# - scatter: distribute the input in the first-dimension
# - gather: gather and concatenate the input in the first-dimension
# - parallel\_apply: apply a set of already-distributed inputs to a set of
# already-distributed models.
#
# To give a better clarity, here function ``data_parallel`` composed using
# these collectives


def data_parallel(module, input, device_ids, output_device=None):
if not device_ids:
return module(input)

if output_device is None:
output_device = device_ids[0]

replicas = nn.parallel.replicate(module, device_ids)
inputs = nn.parallel.scatter(input, device_ids)
replicas = replicas[:len(inputs)]
outputs = nn.parallel.parallel_apply(replicas, inputs)
return nn.parallel.gather(outputs, output_device)

########################################################################
# Part of the model on CPU and part on the GPU
# --------------------------------------------
#
# Let’s look at a small example of implementing a network where part of it
# is on the CPU and part on the GPU

device = torch.device("cuda:0")

class DistributedModel(nn.Module):

def __init__(self):
super().__init__(
embedding=nn.Embedding(1000, 10),
rnn=nn.Linear(10, 10).to(device),
)

def forward(self, x):
# Compute embedding on CPU
x = self.embedding(x)

# Transfer to GPU
x = x.to(device)

# Compute RNN on GPU
x = self.rnn(x)
return x

########################################################################
#
# This was a small introduction to PyTorch for former Torch users.
# There’s a lot more to learn.
#
# Look at our more comprehensive introductory tutorial which introduces
# the ``optim`` package, data loaders etc.: :doc:`/beginner/deep_learning_60min_blitz`.
#
# Also look at
#
# - :doc:`Train neural nets to play video games </intermediate/reinforcement_q_learning>`
# - `Train a state-of-the-art ResNet network on imagenet`_
# - `Train an face generator using Generative Adversarial Networks`_
# - `Train a word-level language model using Recurrent LSTM networks`_
# - `More examples`_
# - `More tutorials`_
# - `Discuss PyTorch on the Forums`_
# - `Chat with other users on Slack`_
#
# .. _`Deep Learning with PyTorch: a 60-minute blitz`: https://github.com/pytorch/tutorials/blob/master/Deep%20Learning%20with%20PyTorch.ipynb
# .. _Train a state-of-the-art ResNet network on imagenet: https://github.com/pytorch/examples/tree/master/imagenet
# .. _Train an face generator using Generative Adversarial Networks: https://github.com/pytorch/examples/tree/master/dcgan
# .. _Train a word-level language model using Recurrent LSTM networks: https://github.com/pytorch/examples/tree/master/word_language_model
# .. _More examples: https://github.com/pytorch/examples
# .. _More tutorials: https://github.com/pytorch/tutorials
# .. _Discuss PyTorch on the Forums: https://discuss.pytorch.org/
# .. _Chat with other users on Slack: https://pytorch.slack.com/messages/beginner/
119 changes: 119 additions & 0 deletions wmla-samples/wmla4.0-cpd4.7/models/pytorch-model_hpo/emetrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# encoding=utf-8
import json
import time
import os

class EMetrics(object):
"""
Manage the logging of metrics on behalf of a python client training with a deep learning framework

Metrics recorded be passed to Watson Machine Learning and made available to WML clients

This will also output TEST_GROUP metrics to file val_dict_list.json to be read by the Hyper-Parameter-Optimization (HPO) algorithm

Example Usage:

from emetrics import EMetrics

with EMetrics.open("1") as metrics:
metrics.record(EMetrics.TEST_GROUP, 1, {"accuracy": 0.6}) # record TEST metric accuracy=0.6 after step 1
metrics.record(EMetrics.TEST_GROUP, 2, {"accuracy": 0.5}) # record TEST metric accuracy=0.5 after step 2
metrics.record(EMetrics.TEST_GROUP, 3, {"accuracy": 0.9}) # record TEST metric accuracy=0.9 after step 3
"""

TEST_GROUP = "test" # standard group name for metrics collected on test dataset (also referred to as holdout or validation dataset)
TRAIN_GROUP = "train" # standard group name for metrics collected on training dataset

def __init__(self,subId,f):
if "TRAINING_ID" in os.environ:
self.trainingId = os.environ["TRAINING_ID"]
else:
self.trainingId = ""
self.rIndex = 1
self.subId = subId
self.f = f
self.test_history = []

def __enter__(self):
return self

def __exit__(self, type, value, tb):
self.close()

@staticmethod
def open(subId=None):
"""
Open and return an EMetrics object

:param subId: optional, the string identifier of an HPO sub-execution (only used in HPO, caller can get the subId from the SUBID environment variable)
:return: EMetrics object
"""
if "LOG_DIR" in os.environ:
folder = os.environ["LOG_DIR"]
elif "JOB_STATE_DIR" in os.environ:
folder = os.path.join(os.environ["JOB_STATE_DIR"],"logs")
else:
folder = "/tmp"

if subId is not None:
folder = os.path.join(folder, subId)

if not os.path.exists(folder):
os.makedirs(folder)

f = open(os.path.join(folder, "evaluation-metrics.txt"), "a")
return EMetrics(subId,f)

def __encode(self,value):
if isinstance(value,int):
return { "type":2, "value": str(value) }
if isinstance(value,float):
return {"type": 3, "value": str(value) }
return { "value": str(value) }

def record(self,group,iteration,values):
"""
Record a set of metrics for a particular group and iteration

:param group: a string identifying how the metrics were computed. Use EMetrics.TEST_GROUP for validation/test data metrics.
:param iteration: an integer indicating the iteration/step/epoch at which the metrics were computed
:param values: a dict containing one or more named metrics (values may be string, float or integer)
"""
if group == EMetrics.TEST_GROUP and self.subId:
d = {"steps": iteration}
d.update(values)
self.test_history.append(d)

obj = {
"meta": {
"training_id":self.trainingId,
"time": int(time.time()*1000),
"rindex": self.rIndex
},
"grouplabel":group,
"etimes": {
"iteration":self.__encode(iteration),
"time_stamp":self.__encode(time.strftime("%Y-%m-%dT%H:%M:%S.%s"))
},
"values": { k:self.__encode(v) for k,v in values.items() }
}

if self.subId:
obj["meta"]["subid"] = str(self.subId)

if self.f:
self.f.write(json.dumps(obj) + "\n")
self.f.flush()

def close(self):
if self.f:
self.f.close()
if "RESULT_DIR" in os.environ:
folder = os.environ["RESULT_DIR"] # should use LOG_DIR?
else:
folder = "/tmp"
if self.subId:
open(os.path.join(folder,"val_dict_list.json"),"w").write(json.dumps(self.test_history))



Loading