streamsx.wml package

IBM Streams WML integration

This package exposes WML online scoring as Python methods to be used in a streamsx topology based streaming application.

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

Changes

v1.1.0:

  • replace the deprecated WML client

v1.0.3:

  • documentation updates: links in description updated

v1.0.1:

  • deprecate: parameter expected_load, but will be supported for backward compatibility if new parameter bundle_size is not provided

  • new: parameter bundles_size determines the bundle size directly

Overview

Provides functions to use Watson Machine Learning (WML) online scoring in topology based streaming applications. All models which are supported by WML for online scoring can be used to score streaming data in a topology application. The models have to be created with tools provided by WML and Cloud Pak for Data. They need to be stored in WML repository and published as an online deployment.

This package is designed for high tuple throughput. It is using the WML feature of sending multiple input data within a single scoring request to the online scoring REST endpoint. This minimizes the communication overhead. On the other side it will increase latency, but only in milli second region.

Quick ref to use this package

  1. Create a model and store it in CP4D WML repository There are good tutorials available for the different supported ML frameworks. To be able to store a model in CP4D WML repository you need to create a ‘deployment space’. You can create different spaces for different scenarioa (test, dev, production). Depending on the tool you are using to create and store your model you will be asked to choose or create a ‘deployment space’. In Jupyter notebook you need to assign the ‘space id’ to your created WML client. You can also assign a space to your project, which can now be used as ‘deployment space’ in the notebook.

  2. Create a WML online scoring deployment You create a deployment by choosing a stored model ID and the type of deployment. Again you can do it via CP4D GUI as context specific action on a shown model or via Python API e.g. in a notebook. You will get information about the progress and the final state of the deployment creation. An WML online scoring deployment is a REST endpoint to which you send (https request) your input data and get scoring results back as response.

  3. Create the streaming application Once the deployment is active you can create the streaming application using the streamsx Python package. You can read data from several sources, transform them, do analytics and write them sinks. In this process you can integrate now the streamsx.wml.wml_online_scoring() function which provides the high performance approach of WML online scoring. Which means that multiple input data are send within a single REST request to the scoring REST endpoint. The maximum number of contained input data can be controlled by parameter.

Sample

A simple sample which scores on an online deployment running a model created for the well know IRIS data set to predict the Iris species from size of the petal and sepal of an exemplar:

from icpd_core import icpd_util
from streamsx.topology.topology import Topology
from streamsx.topology import context
import json
import time
import streamsx.wml as wml
import streamsx.wml.utils as wml_utils

streams_instance_name = 'streams'
cfg=icpd_util.get_service_instance_details(name=streams_instance_name, instance_type="streams")

#field mapping supports JSON string and Python dict
field_mapping_dict =[{"model_field":"Sepal.Length",
                      "tuple_field":"sepal_length"},
                     {"model_field":"Sepal.Width",
                      "tuple_field":"sepal_width"},
                     {"model_field":"Petal.Length",
                      "tuple_field":"petal_length"},
                     {"model_field":"Petal.Width",
                      "tuple_field":"petal_width"}]

#field mapping can be of type dict or a JSON string reflecting same content
field_mapping = json.dumps(field_mapping_dict)

# credentials support JSON string or Python dict
logged_in_users_wml_credentials = json.dumps(wml_utils.get_wml_credentials())  #token,url,instance_id,version

topo = Topology(name="WMLOnlineScoring")

class TestSource:
    def __init__(self, ):
        pass
    def __call__(self):
        # let it stream forever
        counter = 0
        while True:
            counter += 1
            time.sleep(0.01)
            #yield everytime same values, doesn't matter for test
            yield {"petal_length":1.4,
                   "petal_width":0.2,
                   "sepal_length":5.1,
                   "sepal_width":3.5,
                   "number" : counter}

records = topo.source(TestSource())

# 2 result streams are generated: 1st with successful scorings, 2nd with failed scorings because of invalid input
scorings,invalids = wml.wml_online_scoring(records, #input stream
                                     '72a15621-5e2e-44b5-a245-6a0fabc5de1e', #deployment_guid
                                     field_mapping,
                                     logged_in_users_wml_credentials,
                                     'e34d2846-cc27-4e8a-80af-3d0f7021d0cb', #space_guid
                                     bundle_size = 10,
                                     queue_size = 1000,
                                     threads_per_node = 1)

# publish results as JSON
scorings.publish(topic="ScoredRecords",schema=json,name="PublishScores")
score_view = scorings.view(name="ScoredRecords", description="Sample of scored records")

# Disable SSL certificate verification on test
cfg[context.ConfigParams.SSL_VERIFY] = False

# build and submit
submission_result = context.submit('DISTRIBUTED',
                                   topo,
                                   cfg)
streamsx.wml.wml_online_scoring(stream, deployment_guid, field_mapping, credentials, space_guid, expected_load=0, queue_size=2000, threads_per_node=2, single_output=False, node_count=1, connectionConfiguration=None, name=None, bundle_size=100)

Scoring tuples received from input stream using the online scoring endpoint of WML referenced by the deployment_guid. Mapping from input attributes to the models minning fields is done by the field_mapping parameter.

Parameters
  • stream (streamsx.topology.topology.Stream) – input stream of data to be scored

  • deployment_guid (str) – GUID of the WML deplyoment to be used to score the input data

  • field_mapping (list|str) – list or JSON string representing the mappings between model fields and input tuple elements [{‘model_field’:str, ‘is_mandatory’:bool,’tuple_field’:str},{…}]

  • credentials (dict|str) – dict or JSON string of WML credentials to be used {‘url’:<cp4d_cluster_url>,’token’:<token_of authenticated_user>,’instance_id’: ‘wml_local’,’version’:‘2.5.0’}

  • space_guid (str) – GUID of the used deployment space

  • expected_load (int, optional) – depricated, use new parameter bundle_size ! The expected tuple throughput which is used to determine a maximum number of tuple to be sent in one request to WML online scoring, maximum number = expected_load / threads_per_node this value has most impact on throughput performance, defaults to None

  • queue_size (int, optional) – The internal buffer size after which back-pressure happens, defaults to 2000

  • threads_per_node (int, optional) – optional field to set the number of threads used to process the received tuples may increase throughput performance by using free resources while other thread(s) waiting for the asynchronous result from online scoring, defaults to 2

  • single_output (bool, optional) – optional field to define if error data and success data should be send to one output (stream) or to 2 different outputs (streams), defaults to False

  • node_count (int, optional) – optional field giving the number of REST nodes which share the load, defaults to 1

  • name (str, optional) – Give the resulting Streams operator a name of your choice

  • bundle_size (int, optional) – optional field for setting maximum number of tuple to be sent in one scoring request this value has most impact on throughput performance, defaults to 100

Returns

Returns streams for further processing depending on ‘’single_output’’ setting.

’‘single_output’’ = False
result_stream

Tuples as received on input extended by the scoring result field predictions of type dict

error_stream

Tuples as received on input extended by the error indication scoring_error of type dict

’‘single_output’’ = True:
result_stream

all Tuples as received on input, extended by the scoring result field ‘’prediction’’ of type dict or by field ‘’prediction_error’’ of type dict

Return type

result_stream, error_stream(streamsx.topology.topology.Stream, streamsx.topology.topology.Stream)

Indices and tables