Forecasting pipeline

This part of the series of orchestration solution templates explains how to automate the creation of forecasts with a model that was already built in TIM Studio. Using the built model is also the difference when comparing to automation of RTInstantML forecasting where a model is built instantly at the time of forecast. For the purpose of orchestration and automation we use Azure Data Factory tool.

Creating Data Factory pipeline

To follow the step by step guide below there are three requirements:

Steps for creating the pipeline:

  1. Let’s login to Azure Data Factory. You should see welcome screen similar to the one on the image below. In the left pane go to the “Author” tab.

    image.png

  2. Create new pipeline by following the image below.

    image.png

    After that, you should see empty pipeline. To simplify the navigation in the screen we split it visually into “Factory Resources pane”, “Activity pane”, “Lower pane”, “Upper pane 1”, “Upper pane 2”, and “Workspace”, see image below.

    image.png

  3. In the “Lower pane”, go to tab “General” and type name of the pipeline - let's say "build_forecast" pipeline.

    image.png

  4. Then, go to the “Parameters” tab and add five parameters:

    • username – TIM Studio username
    • password – TIM Studio password
    • url – URL of TIM Studio (http://timstudio.tangent.works)
    • mbd_id – ID of the dataset, set the default to 0
    • prediction_horizon – number of samples to predict, set the default to 0

    Set the default values similarly as shown in the image below, we will populate them when running the pipeline.

    image.png

  5. Similarly, go to the "Variables" tab and create three variables:

    • token - for storing authorization token
    • prediction_from - prediction start datetime
    • model_id - ID of the model used for prediction

    These variables will store the information throughout the pipeline run.

    image.png

  6. Now, go to the “Activity pane” -> “General” and find “Web”. Drag & drop this activity into the “Workspace”. Select the activity by clicking on it. In the "Lower pane" go to tab "General" and fill name of the activity, e.g. "get_auth_token". This activity will be responsible for the user authentication.

    image.png

  7. By replicating the previous step add five more activities - 2 x "Set variable", 2 x "Web" and 1 x Azure Function activity. Join the activities with green arrows as illustrated on the image below. The arrow indicates the order in which the activities of the pipeline will run and the green color indicates that the activity executes only if execution of the previous activity is successful.

    image.png

  8. Select the first "Web" activity. In the “Lower pane” go to tab “Settings”. Here we have to specify the API request for the user authentication. We use the username and password parameters of the pipeline defined in step 4.

    URL: @{pipeline().parameters.url}/api/auth/
    Method: POST
    Body: {"username":"@{pipeline().parameters.username}","password":"@{pipeline().parameters.password}"}
    

    Enter each of the values by clicking on the corresponding input field and selecting "Add dynamic content".

    image.png

  9. Select one of the two "Set variable" activities. In the “Lower pane” go to tab “Settings”. Select the token variable that was defined in step 5 and set the value to:

    value: @{activity('get_auth_token').output.token}
    

    It takes the retrieved token from the "Web" activity and stores it in the token variable we created in step 5.

    image.png

  10. Select the second "Set variable" activity. In the “Lower pane” go to tab “Settings”. Select the prediction_from variable that was defined in step 5 and set the value to:

    value: @addhours(concat(substring(utcnow(), 0, 13), ':00:00Z'), 1)
    

    The functions above transforms the current utc datetime by rounding it to hours and then adding one hour to it. For example, if the pipeline runs at 2019-01-01T08:20:35Z then the prediction_from will be set to 2019-01-01T09:00:00Z.

    image.png

  11. Now we want to get list of all models from TIM Studio. Select the second "Web" activity and go to the "Settings" tab. Here we have to specify the Studio API request. Fill the input fields as following:

    URL: @{pipeline().parameters.url}/api/prediction/models/
    Method: GET
    Headers:
        Authorization: Bearer @{variables('token')}
    

    It is a GET request with token authorization in the header.

    image.png

  12. The goal is now to select the latest model that was built under the model building definition specified in the parameters (see step 4). First you need to create Function App in your Azure subscription. If you do not have it, follow this manual to create one. Under Function App, you can create your Azure Function. Follow this manual to create an HTTP triggered Azure Function in Python language.

    Here we provide the code for the function:

    __init__.py

    import logging
    
    import azure.functions as func
    
    def main(req: func.HttpRequest) -> func.HttpResponse:
        logging.info('Python HTTP trigger function processed a request.')
    
        dt_max = "0000-00-00T00:00:00.000000Z"
        idx_max = -1
        mbd_id = None
        models = None
    
        try:
            req_body = req.get_json()
    
            mbd_id = req_body["mbd"]
            models = req_body["models"]
    
            for idx, m in enumerate(models): 
                if m["mbd"] == mbd_id and m["datetime_created"] > dt_max and (m["status"] == "Finished" or m["status"] == "FinishedWithWarning"):
                    dt_max = m["datetime_created"]
                    idx_max = idx
        except ValueError:
            return func.HttpResponse(
                 "Can't parse request body",
                 status_code=400
            )
        except KeyError as e:
            return func.HttpResponse(
                 'Missing key: %s' % str(e),
                 status_code=400
            )
    
        if idx_max == -1:
            return func.HttpResponse(
                 "Can't find relevant model for given model building definition",
                 status_code=400
            )
        else:
            return func.HttpResponse(f"{models[idx_max]}")
    

    function.json

    {
      "scriptFile": "__init__.py",
      "bindings": [
        {
          "authLevel": "anonymous",
          "type": "httpTrigger",
          "direction": "in",
          "name": "req",
          "methods": [
            "post"
          ]
        },
        {
          "type": "http",
          "direction": "out",
          "name": "$return"
        }
      ]
    }
    

    Our function allows you to send http POST request to a specific URL that can be found in the Function App in Azure portal. The body of the request should be a JSON object consisting of two properties - "mbd" and "models". The "mbd" property has integer value that represents the model building definition ID for which we want to get the latest model. The value of "models" property is a JSON array of models from TIM Studio. The structure of the array should follow the structure of response from /prediction/models/ endpoint (we call this endpoint in previous step). Response of the function is a JSON object with the latest model that was build under the specified model building definition.

    Expand the details below to see an example of a valid body of the request and corresponding response.

    Body of the request:

    {
      "mbd":215,
      "models":
        [
          {
            "id":373,
            "dataset":104,
            "datetime_created":"2019-11-05T15:16:31.876515Z",
            "name":"wind_turbine_def",
            "status":"Finished",
            "mbd":215
          }, 
          {
            "id":368,
            "dataset":104,
            "datetime_created":"2019-11-05T13:22:22.171762Z",
            "name":"wind_turbine_def",
            "status":"NotFound",
            "mbd":212
          }
        ]
    }
    

    Corresponding response:

    {
      'id': 373, 
      'dataset': 104, 
      'datetime_created': '2019-11-05T15:16:31.876515Z', 
      'name': 'wind_turbine_def', 
      'status': 'Finished', 
      'mbd': 215
    }
    
  13. Now we will use the function created in the previous step in our pipeline. Select the "Azure Function" activity in the pipeline and go to the "Settings" tab. In the first input field, you have to create new Azure Linked Service that serves as a link to the Function App created in previous step.

    Put the name of you Azure Function in the second input field and fill the method and body input fields as following:

    Method: POST
    Body: {"mbd":@{pipeline().parameters.mbd_id},"models":@{activity('get_all_models').output.Response}}
    

    image.png

  14. Select the third "Web" activity and go to “Settings” tab. Here we have to specify the API request for building the forecast. Fill the input fields as following:

    URL: @{pipeline().parameters.url}/api/prediction/models/@{activity('select_latest_model').output.id}/predict/
    Method: POST
    Headers:
        Authorization: Bearer @{variables('token')}
    Body: {"prediction_scope":{"count":@{pipeline().parameters.prediction_horizon},"_from":"@{substring(variables('prediction_from'), 0, 19)}Z"}}
    

    The url of the request contains the ID of the model that was retrieved in the previous step. The body contains information regarding to how many samples to forecast and which datetime to start the forecast from.

    image.png

  15. To test the pipeline, click on the “Debug” in the “Upper pane 1”. In the popup, we have to specify the input parameters entering the pipeline. Change the default values of “username”, “password”, and “url” parameters if needed. Set prediction_horizon to the number of samples you want to forecast.

    image.png

    The “mbd_id” can be found in the "Model building definition" screen of TIM Studio in the column "ID".

    image.png

    After filling the parameters, click on “Finish”. You should see the output of the debug run in the “Lower pane” -> “Output”. All activities should end up with status "Succeeded" except the “build_forecast” activity with status "Failed" and error message saying that the length of execution output is over limit (see image below). It fails because of too long response from TIM Studio API when building the forecast.

    image.png

    Nevertheless, the forecast was built and it can be verified in TIM Studio. Go to Forecasting screen and under the dataset you should see your forecast.

    image.png

  16. Now we can run the pipeline and simulate production mode. Navigate to "Upper pane 1" and select "Add trigger" -> "Trigger now". Fill the parameters similarly as when debugging the pipeline in previous step.

    In the left pane go to "Monitor" tab to see result of the pipeline run. After a while we should see that it has failed. This is caused by the failing "build_forecast" activity.

    image.png

  17. As we saw in the last two steps, the length of the forecasting response is too long causing the activity to fail despite the fact that the forecast is built. Running the pipeline in production shows "Failed" status for the entire pipeline. To prevent this, reproduce step 5 by adding one more variable called error_message.

    image.png

    Then, add "Set variable" activity to the end of the pipeline. Join it with the previous activity with blue arrow, see image below. The blue "on completion" arrow illustrates that the last activity executes regardless of execution status of the previous activity. Select the activity and go to "Settings" tab. In the first input field select error_message variable and assign it error message from previous activity:

    value: @activity('build_forecast').Error.Message
    

    image.png

  18. You can repeat step 15 for debugging the pipeline and step 16 for simulating production run. The result should be similar. The only difference is that now, when running the pipeline in production, we should not see the "Failed" status any more.

    image.png

  19. In the last step, go back to "Author" tab and save the pipelines by clicking on the “Publish all” button in the “Upper pane 2”.

Download the pipeline

The pipeline can be downloaded here.

If you followed the step by step guide above, you have noticed that the forecasting pipeline uses the latest model under the specified model building definition to create the forecast. Sometimes, however, you may want to explicitly specify the model you want to use for building the forecast. For such case, you can download template of the pipeline here.

See Importing pipelines for importing the pipeline to your Azure Data Factory subscription. When importing the pipeline you will be requested to enter linked service to your Azure Function App with existing Azure Function, which is required in the pipeline. If you do not have it yet, replicate step 12 of the manual above.

Next steps