When handling a producer/consumer problem where there’s a single source of data to be consumed by multiple entities, you might arrive at a state where your consumers will start kicking and punching each other just to get a seat at the table – and that’s clearly not what you want. A black-eyed consumer will obviously lack the will to carry on much longer working for you.
So, how to add some sense to the impending blood-bath the moment our producer starts pouring out its golden nuggets? Academics will of course advise you a myriad of ultra-valid approaches that will evidently solve all your problems. Yet, at the expense of an extra week on your sprint and maybe a few grey-hairs (which you should already have if you’re working on multi-threaded systems).
Ranging from locks to lock-free, non-blocking, synchronized and all that jargon, it sure looks cool until you come up with crazy solutions like “Maybe I’ll use exceptions to communicate”. But do you really need the cannon to kill the fly? You should always measure your cost vs usefulness level when making these type of decisions. If your system is simple enough and your problem is making sure you don’t have consumers starving out and craving for some work, maybe one of your best options is to use some flavor of exponential backoff. Here’s some semi-credible Wikipedia page, give it a read and think about if your system would benefit from this. In that case, continue reading for some spectacular Quart.NET simple implementation!


Now that you’ve decided to use exponential backoff on your system, make sure you have a reference for Quartz.NET on your Visual Studio solution (you can grab it here).

1. Coding the jobs

The first step on this endeavor is to add your Producer/Consumer entry-points, meaning, two classes with an Execute method to be called by Quartz that will in turn execute your existing producer/consumer code. Go ahead and create two new classes (I called them Producer and Consumer, but let the creativity go wild here). Remember, Ctrl+. is your friend:

namespace CreateIt.Examples
{
   public class Producer
   {
      // The class that knows how to produce stuff
      ProducerWorker _producerWorker;
	  
      public Producer() 
	  {
         // initialize your worker -> dependency injection highly recommended, but I will keep it simple
         _producerWorker = new ProducerWorker();
      }
	  
      public void Execute(Quartz.IJobExecutionContext context)
	  {
         // start the worker
         _producerWorker.Produce();
      }
   }
   
   public class Consumer 
   {
      // The class that knows how to consumer stuff
      ConsumerWorker _consumer;
	  
      public Consumer() 
	  {
         // initialize your worker (note: use dependency injection)
         _consumerWorker = new ConsumerWorker();
      }
	  
      public void Execute(Quartz.IJobExecutionContext context) 
	  {
         // start the worker
         bool hasConsumed = _consumerWorker.Consume();
      }
   }
}

In the example above, ProducerWorker and ConsumerWorker should be your own classes, make sure you replace them with the correct name as this is just a guideline.

2. Scheduling

The next step is to configure the jobs to run in a given schedule. I’ll use the configuration file approach for this, but it should be doable using the code API as well (see the documentation for more on this topic).
Scheduling by configuration in Quartz is done with an XML file describing the jobs and triggers. It comprises a list of <schedule> tags containing a <job> and a list of <trigger>. You can add as many <schedule> tags as you want, even of the same job type, but you do have to give different names to the jobs. To start with, add three new schedules, one Producer and two Consumers:

<schedule>	 	 
  <job>	 	 
    <name>Producer</name>	 	 
    <job-type>CreateIt.Examples.Producer, CreateIt.Examples</job-type>	 	 
  </job>	 	 
  <trigger>	 	 
    <simple>	 	 
      <name>ProducerTrigger</name>	 	 
      <job-name>Producer</job-name>	 	 
      <repeat-count>0</repeat-count>
      <repeat-interval>10000</repeat-interval>
    </simple>	 	 
  </trigger>    	 	 
</schedule>	 	 
<schedule>	 	
  <job>	 	 
    <name>Consumer_1</name>	 	 
    <job-type>CreateIt.Examples.Consumer, CreateIt.Examples</job-type>	 	 
  </job>	 	 
  <trigger>	 	 
    <simple>	 	 
      <name>Consumer_1_Trigger</name>	 	 
      <job-name>Consumer_1</job-name>	 	 
      <repeat-count>0</repeat-count>
      <repeat-interval>1000</repeat-interval> 
    </simple>	 	 
  </trigger>    	 	 
</schedule>	 	 
<schedule>	 	 
  <job>	 	 
    <name>Consumer_2</name>	 	 
    <job-type>CreateIt.Examples.Consumer, CreateIt.Examples</job-type>	 	 
  </job>	 	 
  <trigger>	 	 
    <simple>	 	 
      <name>Consumer_2_Trigger</name>	 	 
      <job-name>Consumer_2</job-name>	 	 
      <repeat-count>0</repeat-count>
      <repeat-interval>1000</repeat-interval>
    </simple>	 	 
  </trigger>    	 	 
</schedule>

Notice the connection between the trigger and the job through the job name. You could in theory have several different jobs and triggers inside the same schedule if they are part of the same workflow, but to show the separation of responsibilities the example clearly divides the Producer and Consumer schedules. You can set triggers to fire at any given time with either unlimited or a limited set of repetitions (check the documentation for more info). I’ve also set the repeat-count parameter to 0 so that the jobs repeat forever, and the repeat-interval to 10s for the producer and 1s for the consumers.

You should now add the following to your App.config file to configure the schedule properties. Make sure your job XML file is in an accessible location, mine is named “quartz_jobs.xml” and is in the same folder as the App.config:

<configuration>
  <configSections>
    <section name="quartz" type="System.Configuration.NameValueSectionHandler, System, Version=1.0.5000.0,Culture=neutral, PublicKeyToken=b77a5c561934e089"/>
  </configSections>
  <quartz>
    <add key="quartz.scheduler.instanceName" value="ProducerConsumerScheduler"/>
    <add key="quartz.threadPool.type" value="Quartz.Simpl.SimpleThreadPool, Quartz"/>
    <add key="quartz.threadPool.threadCount" value="3"/>
    <add key="quartz.plugin.xml.type" value="Quartz.Plugin.Xml.XMLSchedulingDataProcessorPlugin, Quartz"/>
    <add key="quartz.plugin.xml.fileNames" value="~/quartz_jobs.xml"/>
  </quartz>
</configuration>

Looking inside the quartz section, only a handful of properties are actually configured. It should be pretty straightforward to grasp the meaning of each one by the key, so I won’t enter into details. You can meddle with a lot of options to make sure your scheduler is fine tuned for your needs, again, check the documentation.

For this example I’m asking for a simple thread pool with 3 threads (one producer, two consumers). I also set the XML plugin to XMLSchedulingDataProcessorPlugin so that my job definitions are understandable by quartz. You can add your own plugins, but the provided ones are usually more than enough.

3. Booting the scheduler – unleashing the beast

We’ve reached the fun part after all this configuration stuff, we will launch our scheduler!

With that in mind, I’m using an ordinary console application with no other logic – you can add this to your initialization workflow. Just make sure that the scheduler’s threads are kept alive in your application regardless of where and when you launch it:

namespace CreateIt.Examples
{
	/// <summary>
	/// Console application class for our Quartz scheduler
	/// </summary>
	public class QuartzSchedulerSample
	{
		/// <summary>
		/// The Quartz scheduler
		/// </summary>
		private Quartz.IScheduler _scheduler;
		
		/// <summary>
		/// The Quartz scheduler
		/// </summary>
		protected Quartz.IScheduler Scheduler
		{
			get
			{
				return _scheduler; 
			}
			
			set 
			{ 
				_scheduler = value; 
			}
		}
			
		/// <summary>
		/// The entry point where we will start scheduler
		/// </summary>
		public static void Main()
		{
			QuartzSchedulerSample quartz = new QuartzSchedulerSample();
			quartz.StartScheduler();

			Console.Write("Press any key to continue . . .");
			Console.ReadKey(true);

			quartz.ShutdownScheduler();
		}
		
		/// <summary>
		/// Starts the scheduler.
		/// </summary>
		public void StartScheduler()
		{
			try
			{
				Scheduler = Quartz.Impl.StdSchedulerFactory.GetDefaultScheduler();
				Scheduler.Start();
			}
			catch (Exception ex)
			{
				// Handle exception...
			}
		}

		/// <summary>
		/// Stops the scheduler.
		/// </summary>
		public void ShutdownScheduler()
		{
			Scheduler.Shutdown();
		}
	}
}

Easy, right? If you start your application you should now see your producer and consumer classes working concurrently, with the producer handing off work every 10 seconds and the consumers waking up every second to check on that.

4. Handling starvation with a backoff strategy

We’ve now reached the core of this post, that is, how to setup a simple backoff pattern on top of Quartz. So far we’ve implemented a simple schedule with one producer and two consumers that blindly wake up every 10s/1s to do stuff. You will hardly find a concurrency problem in these conditions, but if you up the consumer count a bit you will start seeing some workers waiting for longer and longer times before they can grab a piece of work. The plan is to make each consumer wait an increasing amount of time each time it fails to acquire or complete a unit of work. To achieve this effect, 3 steps are required by the consumers: update our backoff time, calculate the next execution date based on that time, and ask Quartz to reschedule the consumer (a self-reschedule, actually). An extra step is also needed to guarantee that state is keeped between executions.

NOTE: we will be working on the Consumer class we defined up on 1., so I will probably not mention it in the rest of the section.

4.1 Update the backoff time

First of all, we need some data structures to hold information regarding the current backoff state. There are several ways to achieve this, and it should be defined according to your system’s needs, but for the sake of our simple example I will set this up with hard-coded values. Use this as a guideline, but please use all the configuration utilities you should already have in place (if you don’t have any, make sure you do).

To start of, we need a way to store or produce an increasing list of times to wait. This can either be random, calculated or hard-coded values. I’m going for the last option, so I’ll add a constant array with the default backoff times in seconds:

/// <summary>
/// The default backoff times (in seconds)
/// </summary>
private const int[] _backoffTimes = new int[] { 1, 5, 60, 300, 3600, 86400 };

Next, we need some way of tracking where we stand, that is, which of the backoff times are we currently pointing to if we need to wait. A simple short will do:

/// <summary>
/// Current backoff index
/// </summary>
private short _backoffIndex = 0;

Now for the core logic, we add a method to update the backoff time given the execution result:

/// <summary>
/// Updates the backoff information given the result of an execution
/// </summary>
/// <param name="executionResult">The result of an execution</param>
private void UpdateBackoffIndex(bool executionResult)
{
     // If the execution was successfull, reset the backoff escalation
     if (executionResult)
     {
	     _backoffIndex = 0;
     }
     else
     { 
	     // If not, advance the index if we're not at the last position yet
	     if (_backoffIndex < (_backoffTimes.Length - 1))
	     {
	          _backoffIndex++;
	     }
     }
}

Finally, update the backoff index in the Consumer’s Execute method passing the execution result:

public void Execute(Quartz.IJobExecutionContext context) 
{
     // start the worker
     bool hasConsumed = _consumerWorker.Consume();
		 
     UpdateBackoffIndex(hasConsumed);
}

4.2 Calculate the next execution date

To find the next execution date is simple, just get the backoff seconds from the backoff array using the current index. This simple method does just that:

/// <summary>
/// Calculates the next execution date based on the current backoff index
/// </summary>
/// <returns>The next execution date</returns>
private DateTime GetNextExecutionDate()
{
     int backoffTime = _backoffTimes[_backoffIndex];
     return DateTime.Now.AddSeconds(backoffTime);
}

Do note that we haven’t yet added logic to manage the current index, we are just plumbing around.

4.3 Self-rescheduling

If you recall our Consumer class, the Execution method is given a context parameter to help the job access the scheduling infrastructure:

public void Execute(Quartz.IJobExecutionContext context);

You can access lots of interesting stuff from here (namely all the job details like the name and group for logging), but what we need to focus on is getting the current trigger so that we can “recycle” it with a new execution date. To reschedule the current job add the following method:

/// <summary>
/// Re-schedules the current job
/// </summary>
/// <param name="context">Job execution context</param>
private void Reschedule(IJobExecutionContext context)
{
     // Get the next execution date
     DateTime nextExecutionDate = GetNextExecutionDate();

     // Get the current trigger
     ITrigger currentTrigger = context.Trigger;
	
     // Get a new builder instance from the current trigger
     TriggerBuilder builder = currentTrigger.GetTriggerBuilder();

     // Create a new trigger instance using the builder from the current trigger
     // and set its start time to the next executed date obtained before.
     // This will use the same configuration parameters
     ITrigger newTrigger = builder
          .StartAt(nextExecutionDate)
          .ForJob(context.JobDetail)
          .Build();

     // Re-schedule the job using the current trigger key and the new trigger configuration
     context.Scheduler.RescheduleJob(currentTrigger.Key, newTrigger);
}

The method starts by getting the new execution date and the current trigger from the job context. It then asks the trigger for its own builder so that we can build a new one from that, which will give us the same configuration, so all we need to do is set the start date and the target job and build the new trigger. Finally, we ask the scheduler to re-schedule the current job using the new trigger (but recycling the current trigger key). This is the most confusing bit, but the comments should help you decipher the code.

4.4 Keeping state between executions

Now that we have our re-scheduling mechanism in place, there’s only one thing missing to make this work. One quirk of Quartz (semi-pun intended) is that jobs don’t maintain their states across executions unless you specifically instruct them to do so. This means that if we want to maintain our current backoff index, we need to instruct Quartz to save it in the JobDataMap. Thankfully, this is farily easy to achieve, with only two small adds to the current code.

First off, store the current index in the JobDataMap at the beginning of the Reschedule method:

context.JobDetail.JobDataMap["backoffIndex"] = _backoffIndex;

Then, read it at the beginning of the Execute method:

if (context.JobDetail.JobDataMap.ContainsKey("backoffIndex"))
{ 
	_backoffIndex = (short)context.JobDetail.JobDataMap["backoffIndex"];
}

In the first execution the data map won’t have the index key, so it will keep the default value of 0.

5. Wrapping up

That’s it, your consumers should not longer starve each other out! Also, if you add a bit of log you can follow through their decisions to backoff in case of difficulties and start to fine tune and adapt this solution for your needs.

Bear in mind that this is clearly not full proof and won’t handle your data-center sized farm, but it can help in those simple cases where all you want is something that works and is quick to implement.

Hope it helps!
Ricardo

LEAVE A REPLY

Please enter your comment!
Please enter your name here