Processing Amazon SageMaker batch jobs with SciKit

Batch jobs are a good option for large datasets or if you do not need an immediate response to a model prediction request. The implementation is quite simple with the help of scikit-learn and SageMaker...

Batch jobs are a good option for large datasets or if you do not need an immediate response to a model prediction request. The implementation is quite simple with the help of scikit-learn and SageMaker.

You need two files. The first one where required data preprocessing will happen, and the second one with the actual execution of the SageMaker job.

In this example, let's call the first file with Python pseudocode preprocessing.py. It reads the dataset, makes cleanings and transformations, then splits it into training and test sets, and processes them.

import pandas as pd
import s3fs
from sklearn.model_selection import train_test_split
from sklearn.compose import make_column_transformer
from sklearn.preprocessing import StandardScaler,OneHotEncoder

# read dataset
data = pd.read_csv(<s3_dataset_path>)

# remove missing data and duplicated 
data.dropna(inplace=True)
data.drop_duplicates(inplace=True)

# split the dataset into training and test sets
X_train, X_test, y_train, y_test = train_test_split(..., test_size=0.2) 

# scale numerical features and to one-hot encode the categorical features
preprocessor = make_column_transformer(
    (StandardScaler(), [...numerical_features...]),
    (OneHotEncoder(), [...categorical_features...)
)

# process the training and test datasets
train_features = preprocess.fit_transform(X_train)
test_features = preprocess.transform(X_test)

# save the processed datasets, separating the features and labels
pd.DataFrame(train_features).to_csv(<train_features_output_path>, header=False, index=False)
pd.DataFrame(test_features).to_csv(<test_features_output_path>, header=False, index=False) 
y_train.to_csv(<train_labels_output_path>, header=False, index=False)
y_test.to_csv(<test_labels_output_path>, header=False, index=False)

In the second file, the code provisions a managed ML instance, pulls a container, and then runs the script on it

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor = SKLearnProcessor(
  framework_version='<version>',
  role=sagemaker.get_execution_role(),
  instance_type='<instance_type>',
  instance_count=1)

sklearn_processor.run(
  code='preprocessing.py',
  inputs=[
    ProcessingInput(
      source='<s3_dataset_path>',
      destination='/opt/ml/processing/input'
    )
  ],
  outputs=[
    ProcessingOutput(source='/opt/ml/processing/output/train'),
    ProcessingOutput(source='/opt/ml/processing/output/validation'),
    ProcessingOutput(source='/opt/ml/processing/output/test')
  ]
)    

Since SageMaker does instance management, once the job is complete, it terminates the instance. The account will be charged for the actual time used.


Looking for help? Reach me anytime.

Subscribe to AWS by Vlad Frantskevich

Don’t miss out on the latest issues. Sign up now to get updates.
jamie@example.com
Subscribe