Automation Testing of Azure data factory pipelines

What is an azure data factory ?

Its an azure service for data ingestion process from varied source system.

What to test in azure data factory  or adf ?
Monitor and validate its successful completion.
Monitor successful execution of each activity of the pipeline. This in turn assures the corrcetness of ingestion process of the source data in the target .

Why should this testing be automated ?
Simply because it fits into automation feasibility assessment and unattended run category.

Some sample code -
Few Prerequisite to start with the automation testing of adf is availability of several parameters.
1. Service principal - This account must have access on the adf to trigger them monitor them, and perform various admin activity on the same for tracking its execution.The automation script use these for authenticating the request to trigger the adfs.
2. Azure resource group, subscription id tenant id,  details etc.
Smart approach is to keep these information in some config file to start with and as devops maturity comes in, pick them from the key vault.

//this is our sample testmethod that automates the testing process of an adf pipeline.
        [TestCategory("ADF Pipeline")]
        [TestMethod]
        public void R1_RunPipeline_PipelineSample()
        {
            this.InitialiseTest();
            string strPipeline = pipelineName_PipelineSample;
            MonitorPipeline(strPipeline);

//this code ensure each activity within a adf pipeline is successfull.
            string[] strActivityarray;
            strActivityarray = new string[13] { "Activity1", "Activity2", "Activity3" };
            bool flag = VerifyActivities(strActivityarray);
            if (flag)
            {
                Assert.AreEqual(true, true);
            }
            else
            {
                Assert.AreEqual(false, true);
            }
        }

// Code for initialization of variables from config file and various other declared variable
 private void InitialiseTest()
        {
            try
            {
                this.clientId = ConfigurationManager.AppSettings.Get("clientId");
                this.clientKey = ConfigurationManager.AppSettings.Get("clientKey");
                this.resourceGroup = ConfigurationManager.AppSettings.Get("resourceGroup");
                this.dataFactory = ConfigurationManager.AppSettings.Get("dataFactory");
                this.pipelineName_PipleineSample = ConfigurationManager.AppSettings.Get("pipelinename_PipelineSample");
);
                this.RunType = ConfigurationManager.AppSettings.Get("RunType");
                this.run_date = ConfigurationManager.AppSettings.Get("run_date");
                this.start_date = ConfigurationManager.AppSettings.Get("start_date");
                this.startDate = ConfigurationManager.AppSettings.Get("startDate");
                this.StartDate = ConfigurationManager.AppSettings.Get("startDate");
                this.ContainerVersion = ConfigurationManager.AppSettings.Get("ContainerVersion");
                this.retries = Convert.ToInt32(ConfigurationManager.AppSettings.Get("retries"));
                this.secondsToWait = Convert.ToInt32(ConfigurationManager.AppSettings.Get("secondsToWait"));
            }
            catch (Exception e)
            {
                this.TestContext.WriteLine("Problem with reading a property. Details are " + e.StackTrace + " : " + e.Message);
                throw;
            }
        }

//here the execution control is done
public void MonitorPipeline(string strPipelineName)
        {
            try
            {
                bool result = this.CreateAdfClient(this.clientId, this.clientKey).GetAwaiter().GetResult();
                if (result)
                {
                    if (string.IsNullOrEmpty(this.runId))
                    {
                        this.StartPipeline(this.resourceGroup, this.dataFactory, strPipelineName);
                    }
                    this.TestContext.WriteLine("Run Id is" + this.runId);
                    int count = 0;
                    PipelineRun run = null;
                    while (count < this.retries)
                    {
                        count++;
                        run = this.innerClient.PipelineRuns.Get(this.resourceGroup, this.dataFactory, this.runId);
                        if (run.Status == "Failed")
                        {
                            this.TestContext.WriteLine("Failed status on ADF");
                            Assert.Fail();
                            break;
                        }
                        else if (run.Status == "Succeeded")
                        {
                            this.TestContext.WriteLine("Success on ADF pipeline");
                            return;
                        }
                        var ts = TimeSpan.FromSeconds(this.secondsToWait);
                        int milliseconds = (int)ts.TotalMilliseconds;
                        System.Threading.Thread.Sleep(milliseconds);
                        TestContext.WriteLine(run.Status + " : retry " + count.ToString() + " : " + DateTime.Now.ToLongTimeString());
                    }
                    Assert.Fail("Did not complete to succeeded in " + (this.secondsToWait * this.retries).ToString() + " seconds");
                }               
                else
                {
                    this.TestContext.WriteLine("Did not create adf client.");
                    Assert.Fail();
                }
            }
            catch (Exception e)
            {
                System.Diagnostics.Debug.WriteLine(e.ToString());
                this.TestContext.WriteLine(e.ToString());
                throw;
            }
        }

//here adf client is being created 
 private async Task<bool> CreateAdfClient(string client_id, string client_key)
        {
            string subscription_id = ConfigurationManager.AppSettings.Get("subscription_id");//
            string tenant_id = ConfigurationManager.AppSettings.Get("tenant_id");
            var authenticationContext = new Microsoft.IdentityModel.Clients.ActiveDirectory.AuthenticationContext($"https://login.windows.net/{tenant_id}");
            var credential = new ClientCredential(clientId: client_id, clientSecret: client_key);
            var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
            if (result == null)
            {
                throw new InvalidOperationException("Failed to obtain the JWT token");
            }
            var cr = new TokenCredentials(result.AccessToken);
            ServiceClientCredentials cred = cr;
            this.innerClient = new DataFactoryManagementClient(cred);
            this.innerClient.SubscriptionId = subscription_id;

            return true;
        }

//Here the pipeline starts running
        private void StartPipeline(string resourceGroup, string dataFactory, string pipelineName)
        {
            System.Collections.Generic.Dictionary<string, object> parameters = new System.Collections.Generic.Dictionary<string, object>();
            parameters.Add("RunType", this.RunType);
            string runId = Guid.NewGuid().ToString();
            Console.WriteLine(runId);
            parameters.Add("RunId", runId);
            parameters.Add("run_date", this.run_date);
            parameters.Add("start_date", this.start_date);
            parameters.Add("startDate", this.startDate);
            parameters.Add("ContainerVersion", this.ContainerVersion);
            var dt = DateTime.Now;
            string description = "Executed by Automation Script on " + dt.ToShortDateString() + " : " + dt.ToShortTimeString();
            parameters.Add("RunDescription", description);
            this.TestContext.WriteLine("Description : " + description);
            CreateRunResponse response = this.innerClient.Pipelines.CreateRun(resourceGroup, dataFactory, pipelineName, null, false, null, parameters);
            this.runId = response.RunId;                   
        }

//here is the code to verify that each activity as is passed in activity list is successfull
 public bool VerifyActivities(string[] strActivitiesArray)
        {
            #region Activity Verification
            bool flag;
            flag = false;
            RunFilterParameters filterParams = new RunFilterParameters(DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10));
            ActivityRunsQueryResponse queryResponse = this.innerClient.ActivityRuns.QueryByPipelineRun(this.resourceGroup, this.dataFactory, this.runId, filterParams);

            for (int i = 0; i < strActivitiesArray.Count(); i++)
            {
                for (int j = 0; j < queryResponse.Value.Count; j++)
                {
                    if (queryResponse.Value[j].ActivityName.ToString() == strActivitiesArray[i] && queryResponse.Value[j].Status == "Succeeded")
                    {
                        flag = true;
                        break;
                    }
                    else
                        flag = false;
                }
            }
            return (flag);   
            #endregion
        }


So what did we achieve
1. Run an adf pipeline
2. Monitor its execution
3. Validate each activity in the adf pipeline is successful.
4. Validate entire adf execution is Successful

If you have close to 200 plus Azure data factory pipeline and they need to be tested in smart way, you can just automate each of them and give them nightly run. Morning the results shall be available to view.

Also no monotonous experience for testers to conduct same activity time and again in longer run.

Post a Comment

Previous Post Next Post