Menu

Automated testing of Synapse Pipelines using xUnit and Azure DevOps

Introduction

Synapse Analytics pipelines is a powerful integration and ETL/ELT tool in Azure. It offers code-free orchestration with over 90 data source connectors and data flow capabilities for more advanced scenarios.

As with other integration and data pipelines including code, applications and deployments, Synapse Pipelines solutions should be included in your integration test plan and the tests should be automated.

BThis post demonstrates the steps required to get your fully automated Synapse Pipelines tests running from Azure DevOps.

Use case

I've prepared a pipeline for testing purposes which imports movie data from TheMovieDatabase.org and converts the movie data from json to parquet format using Synapse Pipelines. The json data import part is implemented using an Azure Function. Below is the high-level diagram of the data ingestion and transformation process.

High-level solution architecture
High-level solution architecture

The data transformation part is done using a data flow executed from the data pipeline.

Data flow in Synapse Pipelines
Data flow in Synapse Pipelines

The data flow does a couple of extra tasks before saving the data in parquet format. It convert the release date to a specific date format and flattens the movie genre information from the json result to a separate data structure.

The following picture illustrates the end result as external tables in Synapse Serverless SQL Pool.

Synapse Serverless SQL Pool tables
Synapse Serverless SQL Pool tables

The test scenario includes executing the data pipeline automatically from a release pipeline in Azure DevOps and verifying that the pipeline has executed successfully. This is only the bare minimum of what should be tested in real-life but it should be enough to explain the core concept and to give you ideas on how the test project could be developed further.

Together with some missing key features, they may make implementing your solution a lot harder and/or unsafe.

Test project setup

The test solution is built using C# and xUnit and it consists of two different test projects: one that is testing the backend solution and included in here solely as a placeholder, and another project for the Synapse Pipelines tests.

Test project structure
Test project structure

The Synapse.Tests projects uses Synapse Analytics SDK for connecting to Synapse Analytics instance. You'll need to install Azure.Analytics.Synapse.Artifacts Nuget package to be able to communicate with the Synapse Analytics instance.


  using Azure.Analytics.Synapse.Artifacts;
  using Azure.Analytics.Synapse.Artifacts.Models;
  using Azure.Identity;

  namespace Synapse.Tests;

  public class SynapseClient
  {
      private const int SleepDurationInMs = 15000;
      private readonly string _workspaceName;
      private readonly PipelineClient _pipelineClient;
      private readonly PipelineRunClient _pipelineRunClient;
      private readonly string _tenantId;

      public SynapseClient(string workspaceName, string tenantId)
      {
          _workspaceName = workspaceName;
          _tenantId = tenantId;

          var credentials = new DefaultAzureCredential(new DefaultAzureCredentialOptions
          {
              VisualStudioTenantId = _tenantId
          });
          _pipelineClient = new PipelineClient(new Uri($"https://{_workspaceName}.dev.azuresynapse.net"), credentials);
          _pipelineRunClient = new PipelineRunClient(new Uri($"https://{_workspaceName}.dev.azuresynapse.net"), credentials);
      }

      public async Task<bool> ExecutePipelineAsync(string pipelineName)
      {
          if (string.IsNullOrWhiteSpace(pipelineName))
          {
              throw new ArgumentException("Pipeline name cannot be null or empty", nameof(pipelineName));   
          }

          var executionResult = await _pipelineClient.CreatePipelineRunAsync(pipelineName);

          PipelineRun pipelineRun;
          while (true)
          {
              pipelineRun = await _pipelineRunClient.GetPipelineRunAsync(executionResult.Value.RunId);

              Console.WriteLine("Status: " + pipelineRun.Status);
              if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued")
                  Thread.Sleep(SleepDurationInMs);
              else
                  return pipelineRun.Status == "Succeeded";
          }
      }
  }

We can now write the tests which utilize the SynapseClient class.


  public class MoviesPipelineTests
  {
      private const string TenantId = "[]";
      private const string SynapseWorkspaceName = "[]";
      private readonly SynapseClient _synapseClient;

      public MoviesPipelineTests()
      {
          _synapseClient = new SynapseClient(SynapseWorkspaceName, TenantId);
      }

      [Fact]
      public async Task ExecutePipeline_Valid_ReturnTrue()
      {
          Assert.True(await _synapseClient.ExecutePipelineAsync("Movies JSON to Parquet"));
      }

      [Fact]
      public async Task ExecutePipeline_Invalid_ReturnFalse()
      {
          await Assert.ThrowsAsync<ArgumentException>(async () => await _synapseClient.ExecutePipelineAsync(""));
      }
  }

As I mentioned earlier, this is just a minimum set of tests which only verifies that the data pipeline in Synapse Analytics has executed successfully. Other tests should be implemented in production environment to verify the outcome of the data pipeline execution for data freshness and quality, such as checking the amount of affected rows and looking for null values in unexpected places.

DevOps pipeline configuration

Now that the test project is properly configured, we can move on to the automated test execution part.

The tests are executed using a single yaml pipeline which builds the test projects and executes the correct tests depending on the release pipeline which triggered the test pipeline execution. Tests are triggered whenever there's a release in the Staging environment stage so that we can still react and fix potential issues before deploying to production if the tests fail.

Tests are executed using AzureCLI task instead of DotNetCoreCLI to enable authentication from the test project using DefaultAzureCredential.


  # Disable triggering from code updates to repo
  trigger: none

  # Set up pipeline to trigger on completion of "release_Staging" stage
  resources:
    pipelines:
      - pipeline: release_api
        source: Release-Api
        trigger:
          branches:
            - release/*
          stages:
            - release_Staging

      - pipeline: release_synapse
        source: Release-Synapse
        trigger:
          branches:
            - release/*
          stages:
            - release_Staging

  variables:
    - template: Variables/variables.yaml
    - name: BuildConfiguration
      value: Release

  jobs:
    - job: release_integrationtests
      displayName: "Execute integration tests"
      pool:
        name: Azure Pipelines
        vmImage: windows-2022

      steps:
        - task: UseDotNet@2
          displayName: "Use .NET SDK 7.0.x"
          inputs:
            version: "7.0.x"

        - task: DotNetCoreCLI@2
          displayName: "Restore project dependencies"
          inputs:
            command: "restore"
            projects: "$(Build.SourcesDirectory)/Tests/IntegrationTests/src/**/*Tests*/*.csproj"
            feedsToUse: "select"

        - task: DotNetCoreCLI@2
          displayName: "Build the project"
          inputs:
            command: "build"
            arguments: "--no-restore --configuration $(BuildConfiguration)"
            projects: "$(Build.SourcesDirectory)/Tests/IntegrationTests/src/**/*Tests*/*.csproj"

        - task: AzureCLI@2
          displayName: "dotnet test Api project"
          condition: eq(variables['resources.triggeringalias'], 'release_api')
          inputs:
            azureSubscription: $(azureSubscriptionName)
            scriptType: pscore
            scriptLocation: inlineScript
            inlineScript: |
              dotnet test $(Build.SourcesDirectory)\Tests\IntegrationTests\src\Api.Tests\ --configuration $(BuildConfiguration) --logger:"trx;LogFileName=TestResultsApi.trx"

        - task: AzureCLI@2
          displayName: "dotnet test Synapse pipelines project"
          condition: eq(variables['resources.triggeringalias'], 'release_synapse')
          inputs:
            azureSubscription: $(azureSubscriptionName)
            scriptType: pscore
            scriptLocation: inlineScript
            inlineScript: |
              dotnet test $(Build.SourcesDirectory)\Tests\IntegrationTests\src\Synapse.Tests\ --configuration $(BuildConfiguration) --logger:"trx;LogFileName=TestResultsSynapse.trx"

        - task: AzureCLI@2
          displayName: "dotnet test all projects"
          condition: eq(variables['resources.triggeringalias'], '')
          inputs:
            azureSubscription: $(azureSubscriptionName)
            scriptType: pscore
            scriptLocation: inlineScript
            inlineScript: |
              dotnet test $(Build.SourcesDirectory)\Tests\IntegrationTests\src\ --configuration $(BuildConfiguration) --logger:"trx;LogFileName=TestResultsAll.trx"

        - task: PublishTestResults@2
          displayName: "Publish Test results"
          inputs:
            testResultsFormat: "VSTest"
            testResultsFiles: "$(Build.SourcesDirectory)/Tests/IntegrationTests/src/*.Tests/**/TestResults*.trx"
            mergeTestResults: true
            failTaskOnFailedTests: true

Conditions are used to only execute the tests based on the triggering pipeline. The test pipeline can also be executed manually when all tests from both test projects need to be performed.

The release_api project pipeline is introduced in the test pipeline for demonstrating the capability of using conditional test execution from a single pipeline.

Running the test pipeline executes the tests and publishes the results on the "Tests" tab of the current release.

Test execution summary
Test execution summary

Summary

Testing Synapse Pipelines using Synapse Analytics SDK is really as simple as executing any other integration tests. The only tricky part is monitoring the pipeline execution which might require more advanced test setup configuration if the pipeline execution time is high, such as splitting your data pipeline or preparing a test-specific dataset.