Forecasting pipeline
This part of the series of orchestration solution templates explains how to automate the creation of forecasts with a model that was already built in TIM Studio. Using the built model is also the difference when comparing to automation of RTInstantML forecasting where a model is built instantly at the time of forecast. For the purpose of orchestration and automation we use Azure Data Factory tool.
- in the Creating Data Factory pipeline subsection below we provide a step by step guide for building the orchestration pipeline
- to download and import the pipeline to your Azure Data Factory subscription go to the Download the pipeline subsection
Creating Data Factory pipeline¶
To follow the step by step guide below there are three requirements:
- create forecasting workspace
- load dataset into TIM Studio
- create model building definition and build model for this dataset
Steps for creating the pipeline:
-
Let’s login to Azure Data Factory. You should see welcome screen similar to the one on the image below. In the left pane go to the “Author” tab.
-
Create new pipeline by following the image below.
After that, you should see empty pipeline. To simplify the navigation in the screen we split it visually into “Factory Resources pane”, “Activity pane”, “Lower pane”, “Upper pane 1”, “Upper pane 2”, and “Workspace”, see image below.
-
In the “Lower pane”, go to tab “General” and type name of the pipeline - let's say "build_forecast" pipeline.
-
Then, go to the “Parameters” tab and add five parameters:
- username – TIM Studio username
- password – TIM Studio password
- url – URL of TIM Studio (http://timstudio.tangent.works)
- mbd_id – ID of the dataset, set the default to 0
- prediction_horizon – number of samples to predict, set the default to 0
Set the default values similarly as shown in the image below, we will populate them when running the pipeline.
-
Similarly, go to the "Variables" tab and create three variables:
- token - for storing authorization token
- prediction_from - prediction start datetime
- model_id - ID of the model used for prediction
These variables will store the information throughout the pipeline run.
-
Now, go to the “Activity pane” -> “General” and find “Web”. Drag & drop this activity into the “Workspace”. Select the activity by clicking on it. In the "Lower pane" go to tab "General" and fill name of the activity, e.g. "get_auth_token". This activity will be responsible for the user authentication.
-
By replicating the previous step add five more activities - 2 x "Set variable", 2 x "Web" and 1 x Azure Function activity. Join the activities with green arrows as illustrated on the image below. The arrow indicates the order in which the activities of the pipeline will run and the green color indicates that the activity executes only if execution of the previous activity is successful.
-
Select the first "Web" activity. In the “Lower pane” go to tab “Settings”. Here we have to specify the API request for the user authentication. We use the username and password parameters of the pipeline defined in step 4.
URL: @{pipeline().parameters.url}/api/auth/ Method: POST Body: {"username":"@{pipeline().parameters.username}","password":"@{pipeline().parameters.password}"}
Enter each of the values by clicking on the corresponding input field and selecting "Add dynamic content".
-
Select one of the two "Set variable" activities. In the “Lower pane” go to tab “Settings”. Select the token variable that was defined in step 5 and set the value to:
value: @{activity('get_auth_token').output.token}
It takes the retrieved token from the "Web" activity and stores it in the token variable we created in step 5.
-
Select the second "Set variable" activity. In the “Lower pane” go to tab “Settings”. Select the prediction_from variable that was defined in step 5 and set the value to:
value: @addhours(concat(substring(utcnow(), 0, 13), ':00:00Z'), 1)
The functions above transforms the current utc datetime by rounding it to hours and then adding one hour to it. For example, if the pipeline runs at 2019-01-01T08:20:35Z then the prediction_from will be set to 2019-01-01T09:00:00Z.
-
Now we want to get list of all models from TIM Studio. Select the second "Web" activity and go to the "Settings" tab. Here we have to specify the Studio API request. Fill the input fields as following:
URL: @{pipeline().parameters.url}/api/prediction/models/ Method: GET Headers: Authorization: Bearer @{variables('token')}
It is a GET request with token authorization in the header.
-
The goal is now to select the latest model that was built under the model building definition specified in the parameters (see step 4). First you need to create Function App in your Azure subscription. If you do not have it, follow the manual to create Function App. Under Function App, you can create your Azure Function. Follow the manual to create an HTTP triggered Azure Function in Python language.
Here we provide the code for the function:
__init__.py
import logging import azure.functions as func def main(req: func.HttpRequest) -> func.HttpResponse: logging.info('Python HTTP trigger function processed a request.') dt_max = "0000-00-00T00:00:00.000000Z" idx_max = -1 mbd_id = None models = None try: req_body = req.get_json() mbd_id = req_body["mbd"] models = req_body["models"] for idx, m in enumerate(models): if m["mbd"] == mbd_id and m["datetime_created"] > dt_max and (m["status"] == "Finished" or m["status"] == "FinishedWithWarning"): dt_max = m["datetime_created"] idx_max = idx except ValueError: return func.HttpResponse( "Can't parse request body", status_code=400 ) except KeyError as e: return func.HttpResponse( 'Missing key: %s' % str(e), status_code=400 ) if idx_max == -1: return func.HttpResponse( "Can't find relevant model for given model building definition", status_code=400 ) else: return func.HttpResponse(f"{models[idx_max]}")
function.json
{ "scriptFile": "__init__.py", "bindings": [ { "authLevel": "anonymous", "type": "httpTrigger", "direction": "in", "name": "req", "methods": [ "post" ] }, { "type": "http", "direction": "out", "name": "$return" } ] }
Our function allows you to send http POST request to a specific URL that can be found in the Function App in Azure portal. The body of the request should be a JSON object consisting of two properties - "mbd" and "models". The "mbd" property has integer value that represents the model building definition ID for which we want to get the latest model. The value of "models" property is a JSON array of models from TIM Studio. The structure of the array should follow the structure of response from /prediction/models/ endpoint (we call this endpoint in previous step). Response of the function is a JSON object with the latest model that was build under the specified model building definition.
Expand the details below to see an example of a valid body of the request and corresponding response.
Body of the request:
{ "mbd":215, "models": [ { "id":373, "dataset":104, "datetime_created":"2019-11-05T15:16:31.876515Z", "name":"wind_turbine_def", "status":"Finished", "mbd":215 }, { "id":368, "dataset":104, "datetime_created":"2019-11-05T13:22:22.171762Z", "name":"wind_turbine_def", "status":"NotFound", "mbd":212 } ] }
Corresponding response:
{ 'id': 373, 'dataset': 104, 'datetime_created': '2019-11-05T15:16:31.876515Z', 'name': 'wind_turbine_def', 'status': 'Finished', 'mbd': 215 }
-
Now we will use the function created in the previous step in our pipeline. Select the "Azure Function" activity in the pipeline and go to the "Settings" tab. In the first input field, you have to create new Azure Linked Service that serves as a link to the Function App created in previous step.
Put the name of you Azure Function in the second input field and fill the method and body input fields as following:
Method: POST Body: {"mbd":@{pipeline().parameters.mbd_id},"models":@{activity('get_all_models').output.Response}}
-
Select the third "Web" activity and go to “Settings” tab. Here we have to specify the API request for building the forecast. Fill the input fields as following:
URL: @{pipeline().parameters.url}/api/prediction/models/@{activity('select_latest_model').output.id}/predict/ Method: POST Headers: Authorization: Bearer @{variables('token')} Body: {"prediction_scope":{"count":@{pipeline().parameters.prediction_horizon},"_from":"@{substring(variables('prediction_from'), 0, 19)}Z"}}
The url of the request contains the ID of the model that was retrieved in the previous step. The body contains information regarding to how many samples to forecast and which datetime to start the forecast from.
-
To test the pipeline, click on the “Debug” in the “Upper pane 1”. In the popup, we have to specify the input parameters entering the pipeline. Change the default values of “username”, “password”, and “url” parameters if needed. Set prediction_horizon to the number of samples you want to forecast.
The “mbd_id” can be found in the "Model building definition" screen of TIM Studio in the column "ID".
After filling the parameters, click on “Finish”. You should see the output of the debug run in the “Lower pane” -> “Output”. All activities should end up with status "Succeeded" except the “build_forecast” activity with status "Failed" and error message saying that the length of execution output is over limit (see image below). It fails because of too long response from TIM Studio API when building the forecast.
Nevertheless, the forecast was built and it can be verified in TIM Studio. Go to Forecasting screen and under the dataset you should see your forecast.
-
Now we can run the pipeline and simulate production mode. Navigate to "Upper pane 1" and select "Add trigger" -> "Trigger now". Fill the parameters similarly as when debugging the pipeline in previous step.
In the left pane go to "Monitor" tab to see result of the pipeline run. After a while we should see that it has failed. This is caused by the failing "build_forecast" activity.
-
As we saw in the last two steps, the length of the forecasting response is too long causing the activity to fail despite the fact that the forecast is built. Running the pipeline in production shows "Failed" status for the entire pipeline. To prevent this, reproduce step 5 by adding one more variable called error_message.
Then, add "Set variable" activity to the end of the pipeline. Join it with the previous activity with blue arrow, see image below. The blue "on completion" arrow illustrates that the last activity executes regardless of execution status of the previous activity. Select the activity and go to "Settings" tab. In the first input field select error_message variable and assign it error message from previous activity:
value: @activity('build_forecast').Error.Message
-
You can repeat step 15 for debugging the pipeline and step 16 for simulating production run. The result should be similar. The only difference is that now, when running the pipeline in production, we should not see the "Failed" status any more.
-
In the last step, go back to "Author" tab and save the pipelines by clicking on the “Publish all” button in the “Upper pane 2”.
Download the pipeline¶
The pipeline can be downloaded here.
If you followed the step by step guide above, you have noticed that the forecasting pipeline uses the latest model under the specified model building definition to create the forecast. Sometimes, however, you may want to explicitly specify the model you want to use for building the forecast. For such case, you can download template of the pipeline here.
See Importing pipelines for importing the pipeline to your Azure Data Factory subscription. When importing the pipeline you will be requested to enter linked service to your Azure Function App with existing Azure Function, which is required in the pipeline. If you do not have it yet, replicate step 12 of the manual above.
Next steps¶
- for creating a trigger that executes the pipeline on schedule, see Triggering pipelines
- consider also creating RTInstantML pipeline
- to automate the data update, see Data Update pipeline
- to automate the model building, see Model Building pipeline