Processing Amazon SageMaker batch jobs with SciKit
Batch jobs are a good option for large datasets or if you do not need an immediate response to a model prediction request. The implementation is quite simple with the help of scikit-learn and SageMaker...
Batch jobs are a good option for large datasets or if you do not need an immediate response to a model prediction request. The implementation is quite simple with the help of scikit-learn and SageMaker.
You need two files. The first one where required data preprocessing will happen, and the second one with the actual execution of the SageMaker job.
In this example, let's call the first file with Python pseudocode preprocessing.py. It reads the dataset, makes cleanings and transformations, then splits it into training and test sets, and processes them.
import pandas as pd
import s3fs
from sklearn.model_selection import train_test_split
from sklearn.compose import make_column_transformer
from sklearn.preprocessing import StandardScaler,OneHotEncoder
# read dataset
data = pd.read_csv(<s3_dataset_path>)
# remove missing data and duplicated
data.dropna(inplace=True)
data.drop_duplicates(inplace=True)
# split the dataset into training and test sets
X_train, X_test, y_train, y_test = train_test_split(..., test_size=0.2)
# scale numerical features and to one-hot encode the categorical features
preprocessor = make_column_transformer(
(StandardScaler(), [...numerical_features...]),
(OneHotEncoder(), [...categorical_features...)
)
# process the training and test datasets
train_features = preprocess.fit_transform(X_train)
test_features = preprocess.transform(X_test)
# save the processed datasets, separating the features and labels
pd.DataFrame(train_features).to_csv(<train_features_output_path>, header=False, index=False)
pd.DataFrame(test_features).to_csv(<test_features_output_path>, header=False, index=False)
y_train.to_csv(<train_labels_output_path>, header=False, index=False)
y_test.to_csv(<test_labels_output_path>, header=False, index=False)
In the second file, the code provisions a managed ML instance, pulls a container, and then runs the script on it
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
sklearn_processor = SKLearnProcessor(
framework_version='<version>',
role=sagemaker.get_execution_role(),
instance_type='<instance_type>',
instance_count=1)
sklearn_processor.run(
code='preprocessing.py',
inputs=[
ProcessingInput(
source='<s3_dataset_path>',
destination='/opt/ml/processing/input'
)
],
outputs=[
ProcessingOutput(source='/opt/ml/processing/output/train'),
ProcessingOutput(source='/opt/ml/processing/output/validation'),
ProcessingOutput(source='/opt/ml/processing/output/test')
]
)
Since SageMaker does instance management, once the job is complete, it terminates the instance. The account will be charged for the actual time used.
Looking for help? Reach me anytime.