www.icesr.com
IT运维工程师的摇篮

.NET平台MongoDB下使用JobStore存储Quartz.Net的Job,Trigger数据

创建一个控制台应用程序Program.cs:

 class Program
    {
        #region 变量
        private static JobStore jobStore;
        private static ITrigger trigger;
        private static IJobDetail jobDetail;
        private static TimeZoneInfo expectedTimeZone;
        private static TimeZoneInfo chinaTimeZone;
        private static JobKey jobKey = new JobKey("test");
        private static CronTriggerImpl retrievedCronTrigger;
        private static DateTimeOffset expectedNextFireTime;
        #endregion

        private static void Main(string[] args)
        {
            Console.WriteLine("------------------------------------------------");
            //1.首先创建一个作业调度池
            ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
            IScheduler scheduler = schedulerFactory.GetScheduler();
            try
            {
                do
                {
                    //启动 scheduler
                    scheduler.Start();
                    //StoreCronTrigger();
                    StoreDailyIntervalTrigger();
                    // 将job和trigger进行绑定,并告知 quartz 调度器用trigger去执行job 
                    scheduler.ScheduleJob(jobDetail, trigger);
                    //IList<IOperableTrigger> list = jobStore.GetTriggersForJob(jobKey);
                    Console.WriteLine("Quartz服务成功启动!");
                    System.Threading.Thread.Sleep(300000);
                } while (true);
            }
            catch (Exception)
            {
                scheduler.Shutdown();
            }
            Console.ReadKey();
        }


        public static void StoreCronTrigger()
        {
            expectedTimeZone = TimeZoneInfo.FindSystemTimeZoneById("China Standard Time");
            
            // 计算任务的开始时间,DateBuilder.evenMinuteDate方法是取下一个整数分钟  
            // DateBuilder.evenMinuteDate(new Date()); 当前时间的下一分钟假如在执行到这一句时,时间为17:23:22,那么runTime就是17:24:00
            //该函数返回最近的整点分钟时间。
            DateTimeOffset runTime = DateBuilder.EvenMinuteDate(DateTime.Now); //DateTimeOffset.UtcNow

            //返回的时间是给定的时间的整数倍。
            //DateBuilder.nextGivenSecondDate(null, 15); 
            //系统时间:Tue Mar 10 09:09:27 CST 2015
            //开始时间:Tue Mar 10 09:09:30 CST 2015   30秒正好是15的倍数
            DateTimeOffset starRunTime = DateBuilder.NextGivenSecondDate(DateTime.Now, 1); 

            jobStore = new JobStore();
            jobStore.ClearAllSchedulingData();

            //定义job和自定义的TestJob进行绑定
            jobDetail = JobBuilder.Create<TestJob>().WithIdentity(jobKey).Build();

            //定义cronTrigger 一个即时触发的触发器,(每隔10秒进行重复执行)
            trigger = TriggerBuilder.Create()
                .ForJob(jobKey)
                .WithCronSchedule("0/10 * * * * ? *").StartNow() 
                //.WithCronSchedule("0/10 * * * * ? *").StartAt(runTime) 
                //.WithCronSchedule("0/10 * * * * ? *", x => x.InTimeZone(expectedTimeZone))  // 0 0 5 ? * *
                .Build();

            //保存到mongodb
            jobStore.StoreJobAndTrigger(jobDetail, (IOperableTrigger) trigger);
        }

        public static void StoreDailyIntervalTrigger()
        {
            TimeZoneInfo chinaTimeZone = TimeZoneInfo.FindSystemTimeZoneById("China Standard Time");

            DateTimeOffset runTime = DateBuilder.EvenMinuteDate(DateTime.Now);

            jobStore = new JobStore();
            jobStore.ClearAllSchedulingData();

            jobDetail = JobBuilder.Create<TestJob>().WithIdentity("test").Build(); 

           
            trigger = TriggerBuilder.Create()
                .ForJob(jobKey)
                //.WithDailyTimeIntervalSchedule(x => x.InTimeZone(chinaTimeZone).OnEveryDay() //创建并定义触发器的规则(每天执行一次时间为:时:分)
                .WithDailyTimeIntervalSchedule(a => a.WithIntervalInSeconds(30).OnEveryDay() 
                .StartingDailyAt(new TimeOfDay(12, 00))
                .EndingDailyAt(new TimeOfDay(17, 50))).Build();

            //保存jobDetail,trigger数据到mongodb
            jobStore.StoreJobAndTrigger(jobDetail, (IOperableTrigger) trigger);
        }

        public static void TestCronTrigger()
        {
            retrievedCronTrigger = (CronTriggerImpl) jobStore.GetTriggersForJob(jobKey).Single();
            retrievedCronTrigger.TimeZone.ShouldEqual(expectedTimeZone);
            //-----------------------------------------------------------------------------------
            retrievedCronTrigger.Triggered(null);
            var nextFireTimeUtc = retrievedCronTrigger.GetNextFireTimeUtc();
            nextFireTimeUtc.ShouldEqual(expectedNextFireTime);
        }

        public static void TestDailyIntervalTrigger()
        {
            var triggersForJob = jobStore.GetTriggersForJob(jobKey).ToList();
            triggersForJob.Count.ShouldEqual(1);
            //-----------------------------------------------------------------------------------
            var trigger = (DailyTimeIntervalTriggerImpl) jobStore.GetTriggersForJob(jobKey).Single();
            trigger.TimeZone.ShouldEqual(chinaTimeZone);
            //-----------------------------------------------------------------------------------
            trigger.StartTimeOfDay.ShouldEqual(new TimeOfDay(10, 10));
            trigger.EndTimeOfDay.ShouldEqual(new TimeOfDay(10, 20));
            //------------------------------------------------------------------------------------------
            trigger.Triggered(null);
            trigger.GetNextFireTimeUtc().ShouldEqual(expectedNextFireTime);
        }
    }

TestJob.cs代码如下:

 public class TestJob: IJob
    {
        public static int count=0;
        public void Execute(IJobExecutionContext context)
        {
             Console.WriteLine(string.Format("执行{0}次了!时间:"+DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"), ++count));
            
        }
    }

JobStore.cs核心类:

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Globalization;
using System.Linq;
using System.Threading;
using Common.Logging;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Conventions;
using MongoDB.Driver;
using MongoDB.Driver.Builders;
using Quartz.Impl.Matchers;
using Quartz.Impl.Triggers;
using Quartz.Spi;

namespace Quartz.Impl.MongoDB
{
    public class JobStore : IJobStore
    {
        private readonly object lockObject = new object();
        private TimeSpan misfireThreshold = TimeSpan.FromSeconds(5);
        private ISchedulerSignaler signaler;

        private readonly ILog log;

        private MongoDatabase database;
        private string instanceId;
        private string instanceName;

        private MongoCollection Calendars
        {
            get { return this.database.GetCollection(instanceName + ".Calendars"); }
        }

        private MongoCollection Jobs
        {
            get { return this.database.GetCollection(instanceName + ".Jobs"); }
        }

        private MongoCollection Triggers
        {
            get { return this.database.GetCollection(instanceName + ".Triggers"); }
        }

        private MongoCollection PausedTriggerGroups
        {
            get { return this.database.GetCollection(instanceName + ".PausedTriggerGroups"); }
        }

        private MongoCollection PausedJobGroups
        {
            get { return this.database.GetCollection(instanceName + ".PausedJobGroups"); }
        }

        private MongoCollection BlockedJobs
        {
            get { return this.database.GetCollection(instanceName + ".BlockedJobs"); }
        }

        private MongoCollection Schedulers
        {
            get { return this.database.GetCollection(instanceName + ".Schedulers"); }
        }

        public static string DefaultConnectionString { get; set; }

        /// <summary>
        /// 初始化<see cref="JobStore"/> 类的新实例。
        /// </summary>
        public JobStore()
        {
            log = LogManager.GetLogger(GetType());

            string connectionString = ConfigurationManager.ConnectionStrings["quartznet-mongodb"] != null
                ? ConfigurationManager.ConnectionStrings["quartznet-mongodb"].ConnectionString
                : DefaultConnectionString;

            //如果没有连接字符串使用,然后抛出一个异常中止建立
            if (string.IsNullOrWhiteSpace(connectionString))
            {
                throw new ApplicationException("Connection string is missing for the MongoDB job store.");
            }

            lock (lockObject)
            {
                var urlBuilder = new MongoUrlBuilder(connectionString);
                var client = new MongoClient(urlBuilder.ToMongoUrl());
                this.database = client.GetServer().GetDatabase("quartznet_tests");
                this.instanceName = "Quartz";
                //client = new MongoClient(ConfigurationManager.ConnectionStrings["mongoDB"].ConnectionString);
                //this.database = client.GetDatabase(databaseName);
                //this.collectionName = collectionName;  
            }
        }

        /// <summary>
        /// 初始化<see cref="JobStore"/>类。
        /// </summary>
        static JobStore()
        {
            var myConventions = new ConventionPack();
            var idConvention = new NamedIdMemberConvention("Id", "Key");
            myConventions.Add(idConvention);
            ConventionRegistry.Register(
                "quartz-net-mongodb",
                myConventions,
                t => t.FullName.StartsWith("Quartz.")
                );

            BsonSerializer.RegisterSerializer(
                typeof (JobKey),
                new JobKeySerializer()
                );

            BsonSerializer.RegisterSerializer(
                typeof (TriggerKey),
                new TriggerKeySerializer()
                );

            BsonSerializer.RegisterSerializer(
                typeof (JobDetailImpl),
                new JobDetailImplSerializer()
                );

            BsonClassMap.RegisterClassMap<JobDetailImpl>(cm =>
            {
                cm.AutoMap();
                cm.SetDiscriminator("JobDetailImpl");
            });

            BsonSerializer.RegisterSerializer(
                typeof (JobDataMap),
                new JobDataMapSerializer()
                );

            BsonSerializer.RegisterSerializer(
                typeof (DateTimeOffset),
                new DateTimeOffsetSerializer()
                );

            BsonSerializer.RegisterGenericSerializerDefinition(typeof (Collection.ISet<>), typeof (SetSerializer<>));

            BsonClassMap.RegisterClassMap<AbstractTrigger>(cm =>
            {
                cm.AutoMap();

                cm.MapField(x => x.Name);
                cm.MapField(x => x.Group);
                cm.MapField(x => x.JobName);
                cm.MapField(x => x.JobGroup);
                cm.MapField(x => x.JobKey);
                cm.MapField(x => x.Name);
                cm.MapField(x => x.Group);
                cm.MapField(x => x.Description);
                cm.MapField(x => x.CalendarName);
                cm.MapField(x => x.JobDataMap);
                cm.MapField(x => x.MisfireInstruction);
                cm.MapField(x => x.FireInstanceId);
                cm.MapField(x => x.EndTimeUtc);
                cm.MapField(x => x.StartTimeUtc);
                cm.MapField(x => x.Priority);

                cm.SetIsRootClass(true);
            });

            BsonClassMap.RegisterClassMap<CalendarIntervalTriggerImpl>(cm =>
            {
                cm.AutoMap();
                cm.MapField("complete");
                cm.MapField("nextFireTimeUtc");
                cm.MapField("previousFireTimeUtc");
                cm.SetIgnoreExtraElements(true);
            });

            BsonClassMap.RegisterClassMap<CronTriggerImpl>(cm =>
            {
                cm.AutoMap();

                cm.MapField(x => x.CronExpressionString);
                cm.MapField(x => x.TimeZone);

                cm.MapField("nextFireTimeUtc");
                cm.MapField("previousFireTimeUtc");
                cm.MapField(x => x.TimeZone).SetSerializer(new TimeZoneInfoSerializer());
                cm.SetIgnoreExtraElements(true);
            });

            BsonSerializer.RegisterSerializer(typeof (TimeOfDay), new TimeOfDaySerializer());

            BsonClassMap.RegisterClassMap<DailyTimeIntervalTriggerImpl>(cm =>
            {
                cm.AutoMap();
                cm.MapField("complete");
                cm.MapField("nextFireTimeUtc");
                cm.MapField("previousFireTimeUtc");
                cm.MapField(x => x.TimeZone).SetSerializer(new TimeZoneInfoSerializer());
                cm.SetIgnoreExtraElements(true);
            });

            BsonClassMap.RegisterClassMap<SimpleTriggerImpl>(cm =>
            {
                cm.AutoMap();
                cm.MapField("complete");
                cm.MapField("nextFireTimeUtc");
                cm.MapField("previousFireTimeUtc");
                cm.SetIgnoreExtraElements(true);
            });
        }

        [TimeSpanParseRule(TimeSpanParseRule.Milliseconds)]
        public virtual TimeSpan MisfireThreshold
        {
            get { return misfireThreshold; }
            set
            {
                if (value.TotalMilliseconds < 1)
                {
                    throw new ArgumentException("Misfirethreashold must be larger than 0");
                }
                misfireThreshold = value;
            }
        }

        private static long ftrCtr = SystemTime.UtcNow().Ticks;

        /// <summary>
        ///获取fired触发记录id。
        /// </summary>
        /// <returns>fired触发记录id.</returns>
        protected virtual string GetFiredTriggerRecordId()
        {
            long value = Interlocked.Increment(ref ftrCtr);
            return Convert.ToString(value, CultureInfo.InvariantCulture);
        }

        public virtual void Initialize(ITypeLoadHelper loadHelper, ISchedulerSignaler s)
        {
            signaler = s;
            Log.Info("MongoDB JobStore initialized.");
        }

        public virtual void SchedulerStarted()
        {
            // nothing to do
        }

        public void SchedulerPaused()
        {
            this.Schedulers.Update(
                Query.EQ("_id", this.instanceId),
                Update.Set("State", "Paused"));
        }

        public void SchedulerResumed()
        {
            this.Schedulers.Update(
                Query.EQ("_id", this.instanceId),
                Update.Set("State", "Resuming"));
        }

        public virtual void Shutdown()
        {
            this.Schedulers.Remove(
                Query.EQ("_id", this.instanceId));

            this.Triggers.Update(
                Query.EQ("SchedulerInstanceId", this.instanceId),
                Update.Unset("SchedulerInstanceId")
                    .Set("State", "Waiting"));
        }

        public virtual bool SupportsPersistence
        {
            get { return true; }
        }


        public void ClearAllSchedulingData()
        {
            lock (lockObject)
            {
                // unschedule jobs (delete triggers)
                this.Triggers.RemoveAll();
                this.PausedTriggerGroups.RemoveAll();

                // delete jobs
                this.Jobs.RemoveAll();
                this.BlockedJobs.RemoveAll();
                this.PausedJobGroups.RemoveAll();

                // delete calendars
                this.Calendars.RemoveAll();
            }
        }


        protected ILog Log
        {
            get { return log; }
        }

        public virtual void StoreJobAndTrigger(IJobDetail newJob, IOperableTrigger newTrigger)
        {
            StoreJob(newJob, false);
            StoreTrigger(newTrigger, false);
        }

        public virtual bool IsJobGroupPaused(string groupName)
        {
            var result = this.PausedJobGroups.FindOneByIdAs<BsonDocument>(groupName);
            return !result.IsBsonNull;
        }

        public virtual bool IsTriggerGroupPaused(string groupName)
        {
            var result = this.PausedTriggerGroups.FindOneByIdAs<BsonDocument>(groupName);
            return !result.IsBsonNull;
        }

        public virtual void StoreJob(IJobDetail newJob, bool replaceExisting)
        {
            bool repl = false;

            lock (lockObject)
            {
                if (this.CheckExists(newJob.Key))
                {
                    if (!replaceExisting)
                    {
                        throw new ObjectAlreadyExistsException(newJob);
                    }

                    repl = true;
                }

                if (!repl)
                {
                    // try insert new
                    this.Jobs.Insert(newJob.ToBsonDocument());
                }
                else
                {
                    // force upsert
                    this.Jobs.Save(newJob.ToBsonDocument());
                }
            }
        }

        public void StoreJobsAndTriggers(IDictionary<IJobDetail, Collection.ISet<ITrigger>> triggersAndJobs,
            bool replace)
        {
            var dictionary = new Dictionary<IJobDetail, IList<ITrigger>>();

            foreach (var triggersAndJob in triggersAndJobs)
            {
                dictionary.Add(triggersAndJob.Key, triggersAndJob.Value.ToList());
            }

            StoreJobsAndTriggers(new Dictionary<IJobDetail, IList<ITrigger>>(dictionary), replace);
        }


        public virtual bool RemoveJob(JobKey jobKey)
        {
            bool found;

            lock (lockObject)
            {
                // keep separated to clean up any staled trigger
                IList<IOperableTrigger> triggersForJob = this.GetTriggersForJob(jobKey);
                foreach (IOperableTrigger trigger in triggersForJob)
                {
                    this.RemoveTrigger(trigger.Key);
                }

                found = this.CheckExists(jobKey);

                if (found)
                {
                    this.Jobs.Remove(
                        Query.EQ("_id", jobKey.ToBsonDocument()));

                    this.BlockedJobs.Remove(
                        Query.EQ("_id", jobKey.ToBsonDocument()));

                    var others = this.Jobs.FindAs<BsonDocument>(
                        Query.EQ("Group", jobKey.Group));

                    if (others.Count() == 0)
                    {
                        this.PausedJobGroups.Remove(
                            Query.EQ("_id", jobKey.Group));
                    }
                }
            }

            return found;
        }

        public bool RemoveJobs(IList<JobKey> jobKeys)
        {
            bool allFound = true;

            lock (lockObject)
            {
                foreach (JobKey key in jobKeys)
                {
                    allFound = RemoveJob(key) && allFound;
                }
            }

            return allFound;
        }

        public bool RemoveTriggers(IList<TriggerKey> triggerKeys)
        {
            bool allFound = true;

            lock (lockObject)
            {
                foreach (TriggerKey key in triggerKeys)
                {
                    allFound = RemoveTrigger(key) && allFound;
                }
            }

            return allFound;
        }

        public void StoreJobsAndTriggers(IDictionary<IJobDetail, IList<ITrigger>> triggersAndJobs, bool replace)
        {
            lock (lockObject)
            {
                // make sure there are no collisions...
                if (!replace)
                {
                    foreach (IJobDetail job in triggersAndJobs.Keys)
                    {
                        if (CheckExists(job.Key))
                        {
                            throw new ObjectAlreadyExistsException(job);
                        }
                        foreach (ITrigger trigger in triggersAndJobs[job])
                        {
                            if (CheckExists(trigger.Key))
                            {
                                throw new ObjectAlreadyExistsException(trigger);
                            }
                        }
                    }
                }
                // do bulk add...
                foreach (IJobDetail job in triggersAndJobs.Keys)
                {
                    StoreJob(job, true);
                    foreach (ITrigger trigger in triggersAndJobs[job])
                    {
                        StoreTrigger((IOperableTrigger) trigger, true);
                    }
                }
            }
        }


        public virtual bool RemoveTrigger(TriggerKey triggerKey)
        {
            return RemoveTrigger(triggerKey, true);
        }

        public virtual void StoreTrigger(IOperableTrigger newTrigger, bool replaceExisting)
        {
            lock (lockObject)
            {
                if (this.CheckExists(newTrigger.Key))
                {
                    if (!replaceExisting)
                    {
                        throw new ObjectAlreadyExistsException(newTrigger);
                    }

                    // don't delete orphaned job, this trigger has the job anyways
                    this.RemoveTrigger(newTrigger.Key, false);
                }

                if (this.RetrieveJob(newTrigger.JobKey) == null)
                {
                    throw new JobPersistenceException("The job (" + newTrigger.JobKey +
                                                      ") referenced by the trigger does not exist.");
                }

                var document = newTrigger.ToBsonDocument();
                string state = "Waiting";

                if (this.PausedTriggerGroups.FindOneByIdAs<BsonDocument>(newTrigger.Key.Group) != null
                    || this.PausedJobGroups.FindOneByIdAs<BsonDocument>(newTrigger.JobKey.Group) != null)
                {
                    state = "Paused";
                    if (this.BlockedJobs.FindOneByIdAs<BsonDocument>(newTrigger.JobKey.ToBsonDocument()) != null)
                    {
                        state = "PausedAndBlocked";
                    }
                }
                else if (this.BlockedJobs.FindOneByIdAs<BsonDocument>(newTrigger.JobKey.ToBsonDocument()) != null)
                {
                    state = "Blocked";
                }

                document.Add("State", state);
                this.Triggers.Save(document);
            }
        }

        public virtual bool RemoveTrigger(TriggerKey key, bool removeOrphanedJob)
        {
            bool found;
            lock (lockObject)
            {
                var trigger = this.RetrieveTrigger(key);
                found = trigger != null;

                if (found)
                {
                    this.Triggers.Remove(
                        Query.EQ("_id", trigger.Key.ToBsonDocument()));

                    if (removeOrphanedJob)
                    {
                        IJobDetail jobDetail = this.RetrieveJob(trigger.JobKey);
                        IList<IOperableTrigger> trigs = this.GetTriggersForJob(jobDetail.Key);
                        if ((trigs == null
                             || trigs.Count == 0)
                            && !jobDetail.Durable)
                        {
                            if (this.RemoveJob(jobDetail.Key))
                            {
                                signaler.NotifySchedulerListenersJobDeleted(jobDetail.Key);
                            }
                        }
                    }
                }
            }

            return found;
        }

        public virtual bool ReplaceTrigger(TriggerKey triggerKey, IOperableTrigger newTrigger)
        {
            bool found;

            lock (lockObject)
            {
                IOperableTrigger oldTrigger = this.Triggers.FindOneByIdAs<IOperableTrigger>(triggerKey.ToBsonDocument());
                found = oldTrigger != null;

                if (found)
                {
                    if (!oldTrigger.JobKey.Equals(newTrigger.JobKey))
                    {
                        throw new JobPersistenceException(
                            "New trigger is not related to the same job as the old trigger.");
                    }

                    this.RemoveTrigger(triggerKey);

                    try
                    {
                        this.StoreTrigger(newTrigger, false);
                    }
                    catch (JobPersistenceException)
                    {
                        this.StoreTrigger(oldTrigger, false); // put previous trigger back...
                        throw;
                    }
                }
            }

            return found;
        }


        public virtual IJobDetail RetrieveJob(JobKey jobKey)
        {
            lock (lockObject)
            {
                return this.Jobs
                    .FindOneByIdAs<IJobDetail>(jobKey.ToBsonDocument());
            }
        }


        public virtual IOperableTrigger RetrieveTrigger(TriggerKey triggerKey)
        {
            lock (lockObject)
            {
                return this.Triggers
                    .FindOneByIdAs<Spi.IOperableTrigger>(triggerKey.ToBsonDocument());
            }
        }


        public bool CheckExists(JobKey jobKey)
        {
            lock (lockObject)
            {
                return this.Jobs.FindOneByIdAs<BsonDocument>(jobKey.ToBsonDocument()) != null;
            }
        }


        public bool CheckExists(TriggerKey triggerKey)
        {
            lock (lockObject)
            {
                return this.Triggers.FindOneByIdAs<BsonDocument>(triggerKey.ToBsonDocument()) != null;
            }
        }


        public virtual TriggerState GetTriggerState(TriggerKey triggerKey)
        {
            lock (lockObject)
            {
                BsonDocument triggerState = this.Triggers.FindOneByIdAs<BsonDocument>(triggerKey.ToBsonDocument());

                if (triggerState.IsBsonNull)
                {
                    return TriggerState.None;
                }
                if (triggerState["State"] == "Complete")
                {
                    return TriggerState.Complete;
                }
                if (triggerState["State"] == "Paused")
                {
                    return TriggerState.Paused;
                }
                if (triggerState["State"] == "PausedAndBlocked")
                {
                    return TriggerState.Paused;
                }
                if (triggerState["State"] == "Blocked")
                {
                    return TriggerState.Blocked;
                }
                if (triggerState["State"] == "Error")
                {
                    return TriggerState.Error;
                }

                return TriggerState.Normal;
            }
        }


        public virtual void StoreCalendar(string name, ICalendar calendar, bool replaceExisting,
            bool updateTriggers)
        {
            CalendarWrapper calendarWrapper = new CalendarWrapper()
            {
                Name = name,
                Calendar = calendar
            };

            lock (lockObject)
            {
                if (this.Calendars.FindOneByIdAs<BsonDocument>(name) != null
                    && replaceExisting == false)
                {
                    throw new ObjectAlreadyExistsException(string.Format(CultureInfo.InvariantCulture,
                        "Calendar with name '{0}' already exists.", name));
                }

                this.Calendars.Save(calendarWrapper);

                if (updateTriggers)
                {
                    var triggers = this.Triggers.FindAs<IOperableTrigger>(Query.EQ("CalendarName", name));
                    foreach (IOperableTrigger trigger in triggers)
                    {
                        trigger.UpdateWithNewCalendar(calendar, MisfireThreshold);
                        this.Triggers.Save(trigger);
                    }
                }
            }
        }


        public virtual bool RemoveCalendar(string calName)
        {
            if (this.Triggers.FindAs<BsonDocument>(Query.EQ("CalendarName", calName)) != null)
            {
                throw new JobPersistenceException("Calender cannot be removed if it is referenced by a Trigger!");
            }

            this.Calendars.Remove(
                Query.EQ("_id", calName));

            return true;
        }


        public virtual ICalendar RetrieveCalendar(string calName)
        {
            lock (lockObject)
            {
                CalendarWrapper calendarWrapper = this.Calendars
                    .FindOneByIdAs<CalendarWrapper>(calName);

                if (calendarWrapper != null)
                {
                    return calendarWrapper.Calendar;
                }

                return null;
            }
        }


        public virtual int GetNumberOfJobs()
        {
            lock (lockObject)
            {
                return (int) this.Jobs.Count();
            }
        }


        public virtual int GetNumberOfTriggers()
        {
            lock (lockObject)
            {
                return (int) this.Triggers.Count();
            }
        }


        public virtual int GetNumberOfCalendars()
        {
            lock (lockObject)
            {
                return (int) this.Calendars.Count();
            }
        }

        public virtual Collection.ISet<JobKey> GetJobKeys(GroupMatcher<JobKey> matcher)
        {
            lock (lockObject)
            {
                var result = this.Jobs
                    .FindAs<IJobDetail>(
                        Query.EQ("Group", matcher.CompareToValue))
                    .Select(j => j.Key);

                return new Collection.HashSet<JobKey>(result);
            }
        }


        public virtual IList<string> GetCalendarNames()
        {
            lock (lockObject)
            {
                return this.Calendars
                    .Distinct("Name")
                    .Select(g => g.AsString)
                    .ToList();
            }
        }


        public virtual Collection.ISet<TriggerKey> GetTriggerKeys(GroupMatcher<TriggerKey> matcher)
        {
            lock (lockObject)
            {
                var result = this.Triggers
                    .FindAs<Spi.IOperableTrigger>(
                        Query.EQ("Group", matcher.CompareToValue))
                    .Select(t => t.Key);

                return new Collection.HashSet<TriggerKey>(result);
            }
        }


        public virtual IList<string> GetJobGroupNames()
        {
            lock (lockObject)
            {
                return this.Jobs
                    .Distinct("Group")
                    .Select(g => g.AsString)
                    .ToList();
            }
        }


        public virtual IList<string> GetTriggerGroupNames()
        {
            lock (lockObject)
            {
                return this.Triggers
                    .Distinct("Group")
                    .Select(g => g.AsString)
                    .ToList();
            }
        }


        public virtual IList<IOperableTrigger> GetTriggersForJob(JobKey jobKey)
        {
            lock (lockObject)
            {
                return this.Triggers
                    .FindAs<Spi.IOperableTrigger>(
                        Query.EQ("JobKey", jobKey.ToBsonDocument()))
                    .ToList();
            }
        }

        public virtual void PauseTrigger(TriggerKey triggerKey)
        {
            lock (lockObject)
            {
                this.Triggers.Update(
                    Query.And(
                        Query.EQ("_id", triggerKey.ToBsonDocument()),
                        Query.EQ("State", "Blocked")),
                    Update.Set("State", "PausedAndBlocked"));

                this.Triggers.Update(
                    Query.And(
                        Query.EQ("_id", triggerKey.ToBsonDocument()),
                        Query.NE("State", "Blocked")),
                    Update.Set("State", "Paused"));
            }
        }


        public virtual Collection.ISet<string> PauseTriggers(GroupMatcher<TriggerKey> matcher)
        {
            IList<string> pausedGroups;

            lock (lockObject)
            {
                pausedGroups = new List<string>();

                StringOperator op = matcher.CompareWithOperator;
                if (op == StringOperator.Equality)
                {
                    this.PausedTriggerGroups.Save(
                        new BsonDocument(
                            new BsonElement("_id", matcher.CompareToValue)));

                    pausedGroups.Add(matcher.CompareToValue);
                }
                else
                {
                    IList<string> groups = this.GetTriggerGroupNames();

                    foreach (string group in groups)
                    {
                        if (op.Evaluate(group, matcher.CompareToValue))
                        {
                            this.PausedTriggerGroups.Save(
                                new BsonDocument(
                                    new BsonElement("_id", matcher.CompareToValue)));

                            pausedGroups.Add(matcher.CompareToValue);
                        }
                    }
                }

                foreach (string pausedGroup in pausedGroups)
                {
                    Collection.ISet<TriggerKey> keys =
                        this.GetTriggerKeys(GroupMatcher<TriggerKey>.GroupEquals(pausedGroup));

                    foreach (TriggerKey key in keys)
                    {
                        this.PauseTrigger(key);
                    }
                }
            }
            return new Collection.HashSet<string>(pausedGroups);
        }

        public virtual void PauseJob(JobKey jobKey)
        {
            lock (lockObject)
            {
                IList<IOperableTrigger> triggersForJob = this.GetTriggersForJob(jobKey);
                foreach (IOperableTrigger trigger in triggersForJob)
                {
                    this.PauseTrigger(trigger.Key);
                }
            }
        }


        public virtual IList<string> PauseJobs(GroupMatcher<JobKey> matcher)
        {
            List<string> pausedGroups = new List<String>();
            lock (lockObject)
            {
                StringOperator op = matcher.CompareWithOperator;
                if (op == StringOperator.Equality)
                {
                    this.PausedJobGroups.Save(
                        new BsonDocument(
                            new BsonElement("_id", matcher.CompareToValue)));

                    pausedGroups.Add(matcher.CompareToValue);
                }
                else
                {
                    IList<string> groups = this.GetJobGroupNames();

                    foreach (string group in groups)
                    {
                        if (op.Evaluate(group, matcher.CompareToValue))
                        {
                            this.PausedJobGroups.Save(
                                new BsonDocument(
                                    new BsonElement("_id", matcher.CompareToValue)));

                            pausedGroups.Add(matcher.CompareToValue);
                        }
                    }
                }

                foreach (string groupName in pausedGroups)
                {
                    foreach (JobKey jobKey in GetJobKeys(GroupMatcher<JobKey>.GroupEquals(groupName)))
                    {
                        IList<IOperableTrigger> triggers = this.GetTriggersForJob(jobKey);
                        foreach (IOperableTrigger trigger in triggers)
                        {
                            this.PauseTrigger(trigger.Key);
                        }
                    }
                }
            }

            return pausedGroups;
        }


        public virtual void ResumeTrigger(TriggerKey triggerKey)
        {
            lock (lockObject)
            {
                IOperableTrigger trigger = this.Triggers.FindOneByIdAs<IOperableTrigger>(triggerKey.ToBsonDocument());

                // does the trigger exist?
                if (trigger == null)
                {
                    return;
                }

                BsonDocument triggerState = this.Triggers.FindOneByIdAs<BsonDocument>(triggerKey.ToBsonDocument());
                // if the trigger is not paused resuming it does not make sense...
                if (triggerState["State"] != "Paused" &&
                    triggerState["State"] != "PausedAndBlocked")
                {
                    return;
                }

                if (this.BlockedJobs.FindOneByIdAs<BsonDocument>(trigger.JobKey.ToBsonDocument()) != null)
                {
                    triggerState["State"] = "Blocked";
                }
                else
                {
                    triggerState["State"] = "Waiting";
                }

                this.ApplyMisfire(trigger);

                this.Triggers.Save(triggerState);
            }
        }


        public virtual IList<string> ResumeTriggers(GroupMatcher<TriggerKey> matcher)
        {
            Collection.ISet<string> groups = new Collection.HashSet<string>();
            lock (lockObject)
            {
                Collection.ISet<TriggerKey> keys = this.GetTriggerKeys(matcher);

                foreach (TriggerKey triggerKey in keys)
                {
                    groups.Add(triggerKey.Group);
                    IOperableTrigger trigger = this.Triggers.FindOneByIdAs<IOperableTrigger>(triggerKey.ToBsonDocument());
                    var pausedJobGroup = this.PausedJobGroups.FindOneByIdAs<string>(trigger.JobKey.Group);
                    if (pausedJobGroup != null)
                    {
                        continue;
                    }

                    this.ResumeTrigger(triggerKey);
                }

                foreach (String group in groups)
                {
                    this.PausedTriggerGroups.Remove(
                        Query.EQ("_id", group));
                }
            }

            return new List<string>(groups);
        }


        public virtual void ResumeJob(JobKey jobKey)
        {
            lock (lockObject)
            {
                IList<IOperableTrigger> triggersForJob = GetTriggersForJob(jobKey);
                foreach (IOperableTrigger trigger in triggersForJob)
                {
                    this.ResumeTrigger(trigger.Key);
                }
            }
        }


        public virtual Collection.ISet<string> ResumeJobs(GroupMatcher<JobKey> matcher)
        {
            Collection.ISet<string> resumedGroups = new Collection.HashSet<string>();
            lock (lockObject)
            {
                Collection.ISet<JobKey> keys = GetJobKeys(matcher);

                foreach (string pausedJobGroup in this.PausedJobGroups.FindAllAs<string>())
                {
                    if (matcher.CompareWithOperator.Evaluate(pausedJobGroup, matcher.CompareToValue))
                    {
                        resumedGroups.Add(pausedJobGroup);
                    }
                }

                this.PausedTriggerGroups.Remove(
                    Query.All("_id", new BsonArray(resumedGroups)));

                foreach (JobKey key in keys)
                {
                    IList<IOperableTrigger> triggers = GetTriggersForJob(key);
                    foreach (IOperableTrigger trigger in triggers)
                    {
                        ResumeTrigger(trigger.Key);
                    }
                }
            }

            return resumedGroups;
        }


        public virtual void PauseAll()
        {
            lock (lockObject)
            {
                IList<string> triggerGroupNames = GetTriggerGroupNames();

                foreach (string groupName in triggerGroupNames)
                {
                    this.PauseTriggers(GroupMatcher<TriggerKey>.GroupEquals(groupName));
                }
            }
        }


        public virtual void ResumeAll()
        {
            lock (lockObject)
            {
                // TODO need a match all here!
                this.PausedJobGroups.RemoveAll();
                IList<string> triggerGroupNames = this.GetTriggerGroupNames();

                foreach (string groupName in triggerGroupNames)
                {
                    this.ResumeTriggers(GroupMatcher<TriggerKey>.GroupEquals(groupName));
                }
            }
        }


        protected virtual bool ApplyMisfire(IOperableTrigger trigger)
        {
            DateTimeOffset misfireTime = SystemTime.UtcNow();
            if (MisfireThreshold > TimeSpan.Zero)
            {
                misfireTime = misfireTime.AddMilliseconds(-1*MisfireThreshold.TotalMilliseconds);
            }

            DateTimeOffset? tnft = trigger.GetNextFireTimeUtc();
            if (!tnft.HasValue || tnft.Value > misfireTime
                || trigger.MisfireInstruction == MisfireInstruction.IgnoreMisfirePolicy)
            {
                return false;
            }

            ICalendar cal = null;
            if (trigger.CalendarName != null)
            {
                cal = this.RetrieveCalendar(trigger.CalendarName);
            }

            signaler.NotifyTriggerListenersMisfired(trigger);

            trigger.UpdateAfterMisfire(cal);
            this.StoreTrigger(trigger, true);

            if (!trigger.GetNextFireTimeUtc().HasValue)
            {
                this.Triggers.Update(
                    Query.EQ("_id", trigger.Key.ToBsonDocument()),
                    Update.Set("State", "Complete"));

                signaler.NotifySchedulerListenersFinalized(trigger);
            }
            else if (tnft.Equals(trigger.GetNextFireTimeUtc()))
            {
                return false;
            }

            return true;
        }


        public virtual IList<IOperableTrigger> AcquireNextTriggers(DateTimeOffset noLaterThan, int maxCount,
            TimeSpan timeWindow)
        {
            lock (lockObject)
            {
                this.Schedulers.Save(new BsonDocument()
                    .SetElement(new BsonElement("_id", this.instanceId))
                    .SetElement(new BsonElement("Expires", (SystemTime.Now() + new TimeSpan(0, 10, 0)).UtcDateTime))
                    .SetElement(new BsonElement("State", "Running")));

                this.Schedulers.Remove(
                    Query.LT("Expires", SystemTime.Now().UtcDateTime));

                IEnumerable<BsonValue> activeInstances = this.Schedulers.Distinct("_id");

                this.Triggers.Update(
                    Query.NotIn("SchedulerInstanceId", activeInstances),
                    Update.Unset("SchedulerInstanceId")
                        .Set("State", "Waiting"));

                List<IOperableTrigger> result = new List<IOperableTrigger>();
                Collection.ISet<JobKey> acquiredJobKeysForNoConcurrentExec = new Collection.HashSet<JobKey>();
                DateTimeOffset? firstAcquiredTriggerFireTime = null;

                var candidates = this.Triggers.FindAs<Spi.IOperableTrigger>(
                    Query.And(
                        Query.EQ("State", "Waiting"),
                        Query.LTE("nextFireTimeUtc", (noLaterThan + timeWindow).UtcDateTime)))
                    .OrderBy(t => t.GetNextFireTimeUtc()).ThenByDescending(t => t.Priority);

                foreach (IOperableTrigger trigger in candidates)
                {
                    if (trigger.GetNextFireTimeUtc() == null)
                    {
                        continue;
                    }


                    if (firstAcquiredTriggerFireTime != null
                        && trigger.GetNextFireTimeUtc() > (firstAcquiredTriggerFireTime.Value + timeWindow))
                    {
                        break;
                    }

                    if (this.ApplyMisfire(trigger))
                    {
                        if (trigger.GetNextFireTimeUtc() == null
                            || trigger.GetNextFireTimeUtc() > noLaterThan + timeWindow)
                        {
                            continue;
                        }
                    }


                    JobKey jobKey = trigger.JobKey;
                    IJobDetail job = this.Jobs.FindOneByIdAs<IJobDetail>(jobKey.ToBsonDocument());

                    if (job.ConcurrentExecutionDisallowed)
                    {
                        if (acquiredJobKeysForNoConcurrentExec.Contains(jobKey))
                        {
                            continue; // go to next trigger in store.
                        }
                        else
                        {
                            acquiredJobKeysForNoConcurrentExec.Add(jobKey);
                        }
                    }

                    trigger.FireInstanceId = this.GetFiredTriggerRecordId();
                    var acquired = this.Triggers.FindAndModify(
                        Query.And(
                            Query.EQ("_id", trigger.Key.ToBsonDocument()),
                            Query.EQ("State", "Waiting")),
                        SortBy.Null,
                        Update.Set("State", "Acquired")
                            .Set("SchedulerInstanceId", this.instanceId)
                            .Set("FireInstanceId", trigger.FireInstanceId));

                    if (acquired.ModifiedDocument != null)
                    {
                        result.Add(trigger);

                        if (firstAcquiredTriggerFireTime == null)
                        {
                            firstAcquiredTriggerFireTime = trigger.GetNextFireTimeUtc();
                        }
                    }

                    if (result.Count == maxCount)
                    {
                        break;
                    }
                }

                return result;
            }
        }


        public virtual void ReleaseAcquiredTrigger(IOperableTrigger trigger)
        {
            lock (lockObject)
            {
                this.Triggers.Update(
                    Query.EQ("_id", trigger.Key.ToBsonDocument()),
                    Update.Unset("SchedulerInstanceId")
                        .Set("State", "Waiting"));
            }
        }


        public virtual IList<TriggerFiredResult> TriggersFired(IList<IOperableTrigger> triggers)
        {
            lock (lockObject)
            {
                List<TriggerFiredResult> results = new List<TriggerFiredResult>();

                foreach (IOperableTrigger trigger in triggers)
                {
                    // was the trigger deleted since being acquired?
                    if (this.Triggers.FindOneByIdAs<BsonDocument>(trigger.Key.ToBsonDocument()) == null)
                    {
                        continue;
                    }
                    // was the trigger completed, paused, blocked, etc. since being acquired?
                    BsonDocument triggerState = this.Triggers.FindOneByIdAs<BsonDocument>(trigger.Key.ToBsonDocument());
                    if (triggerState["State"] != "Acquired")
                    {
                        continue;
                    }

                    ICalendar cal = null;
                    if (trigger.CalendarName != null)
                    {
                        cal = this.RetrieveCalendar(trigger.CalendarName);
                        if (cal == null)
                        {
                            continue;
                        }
                    }

                    DateTimeOffset? prevFireTime = trigger.GetPreviousFireTimeUtc();

                    // call triggered on our copy, and the scheduler's copy
                    trigger.Triggered(cal);

                    var document = trigger.ToBsonDocument();
                    document.Add("State", "Executing");
                    this.Triggers.Save(document);

                    TriggerFiredBundle bndle = new TriggerFiredBundle(this.RetrieveJob(trigger.JobKey),
                        trigger,
                        cal,
                        false, SystemTime.UtcNow(),
                        trigger.GetPreviousFireTimeUtc(), prevFireTime,
                        trigger.GetNextFireTimeUtc());

                    IJobDetail job = bndle.JobDetail;

                    if (job.ConcurrentExecutionDisallowed)
                    {
                        var jobTriggers = this.GetTriggersForJob(job.Key);
                        IEnumerable<BsonDocument> triggerKeys = jobTriggers
                            .Where(t => !t.Key.Equals(trigger.Key))
                            .Select(t => t.Key.ToBsonDocument());

                        this.Triggers.Update(
                            Query.And(
                                Query.In("_id", triggerKeys),
                                Query.EQ("State", "Waiting")),
                            Update.Set("State", "Blocked"));

                        this.Triggers.Update(
                            Query.And(
                                Query.In("_id", triggerKeys),
                                Query.EQ("State", "Paused")),
                            Update.Set("State", "PausedAndBlocked"));

                        this.BlockedJobs.Save(
                            new BsonDocument(
                                new BsonElement("_id", job.Key.ToBsonDocument())));
                    }

                    results.Add(new TriggerFiredResult(bndle));
                }
                return results;
            }
        }

        public virtual void TriggeredJobComplete(IOperableTrigger trigger, IJobDetail jobDetail,
            SchedulerInstruction triggerInstCode)
        {
            lock (lockObject)
            {
                this.ReleaseAcquiredTrigger(trigger);

                // It's possible that the job is null if:
                //   1- it was deleted during execution
                //   2- RAMJobStore is being used only for volatile jobs / triggers
                //      from the JDBC job store

                if (jobDetail.PersistJobDataAfterExecution)
                {
                    this.Jobs.Update(
                        Query.EQ("_id", jobDetail.Key.ToBsonDocument()),
                        Update.Set("JobDataMap", jobDetail.JobDataMap.ToBsonDocument()));
                }

                if (jobDetail.ConcurrentExecutionDisallowed)
                {
                    IList<Spi.IOperableTrigger> jobTriggers = this.GetTriggersForJob(jobDetail.Key);
                    IEnumerable<BsonDocument> triggerKeys = jobTriggers.Select(t => t.Key.ToBsonDocument());
                    this.Triggers.Update(
                        Query.And(
                            Query.In("_id", triggerKeys),
                            Query.EQ("State", "Blocked")),
                        Update.Set("State", "Waiting"));

                    this.Triggers.Update(
                        Query.And(
                            Query.In("_id", triggerKeys),
                            Query.EQ("State", "PausedAndBlocked")),
                        Update.Set("State", "Paused"));

                    signaler.SignalSchedulingChange(null);
                }


                this.BlockedJobs.Remove(
                    Query.EQ("_id", jobDetail.Key.ToBsonDocument()));


                if (triggerInstCode == SchedulerInstruction.DeleteTrigger)
                {
                    log.Debug("Deleting trigger");
                    DateTimeOffset? d = trigger.GetNextFireTimeUtc();
                    if (!d.HasValue)
                    {
                        d = trigger.GetNextFireTimeUtc();
                        if (!d.HasValue)
                        {
                            this.RemoveTrigger(trigger.Key);
                        }
                        else
                        {
                            log.Debug("Deleting cancelled - trigger still active");
                        }
                    }
                    else
                    {
                        this.RemoveTrigger(trigger.Key);
                        signaler.SignalSchedulingChange(null);
                    }
                }
                else if (triggerInstCode == SchedulerInstruction.SetTriggerComplete)
                {
                    this.Triggers.Update(
                        Query.EQ("_id", trigger.Key.ToBsonDocument()),
                        Update.Set("State", "Complete"));

                    signaler.SignalSchedulingChange(null);
                }
                else if (triggerInstCode == SchedulerInstruction.SetTriggerError)
                {
                    Log.Info(string.Format(CultureInfo.InvariantCulture, "Trigger {0} set to ERROR state.", trigger.Key));
                    this.Triggers.Update(
                        Query.EQ("_id", trigger.Key.ToBsonDocument()),
                        Update.Set("State", "Error"));

                    signaler.SignalSchedulingChange(null);
                }
                else if (triggerInstCode == SchedulerInstruction.SetAllJobTriggersError)
                {
                    Log.Info(string.Format(CultureInfo.InvariantCulture, "All triggers of Job {0} set to ERROR state.",
                        trigger.JobKey));
                    IList<Spi.IOperableTrigger> jobTriggers = this.GetTriggersForJob(jobDetail.Key);
                    IEnumerable<BsonDocument> triggerKeys = jobTriggers.Select(t => t.Key.ToBsonDocument());
                    this.Triggers.Update(
                        Query.In("_id", triggerKeys),
                        Update.Set("State", "Error"));

                    signaler.SignalSchedulingChange(null);
                }
                else if (triggerInstCode == SchedulerInstruction.SetAllJobTriggersComplete)
                {
                    IList<Spi.IOperableTrigger> jobTriggers = this.GetTriggersForJob(jobDetail.Key);
                    IEnumerable<BsonDocument> triggerKeys = jobTriggers.Select(t => t.Key.ToBsonDocument());
                    this.Triggers.Update(
                        Query.In("_id", triggerKeys),
                        Update.Set("State", "Complete"));

                    signaler.SignalSchedulingChange(null);
                }
            }
        }

        public virtual string InstanceId
        {
            set { this.instanceId = value; }
        }


        public virtual string InstanceName
        {
            set { this.instanceName = value; }
        }

        public int ThreadPoolSize
        {
            set { }
        }

        public long EstimatedTimeToReleaseAndAcquireTrigger
        {
            get { return 200; }
        }

        public bool Clustered
        {
            get { return true; }
        }


        public virtual Collection.ISet<string> GetPausedTriggerGroups()
        {
            return new Collection.HashSet<string>(this.PausedTriggerGroups.FindAllAs<string>());
        }
    }
}

结果如图:

Mongodb数据库:


未经允许不得转载:冰点网络 » .NET平台MongoDB下使用JobStore存储Quartz.Net的Job,Trigger数据

分享到:更多 ()

评论 抢沙发

评论前必须登录!