/**
* ?????
* @return
*/
private DisruptorHelper(){ }
/**
* ?????
*/
private void init(){
execute=new ExecutorService[group];
ringBuffer=new RingBuffer[group];
sequenceBarrier=new SequenceBarrier[group];
handler=new TaskEventHandler[group];
batchEventProcessor=new BatchEventProcessor[group];
////////////////??????////////////////
//?????ringbuffer?????Event
for(int i=0;i<group;i++){
ringBuffer[i] = RingBuffer.create(ProducerType.SINGLE?? TaskEvent.EVENT_FACTORY?? BUFFER_SIZE?? new YieldingWaitStrategy());
sequenceBarrier[i] = ringBuffer[i].newBarrier();
handler[i] = new TaskEventHandler();
batchEventProcessor[i] = new BatchEventProcessor<TaskEvent>(ringBuffer[i]?? sequenceBarrier[i]?? handler[i]);
ringBuffer[i].addGatingSequences(batchEventProcessor[i].getSequence());
execute[i]= Executors.newSingleThreadExecutor();
execute[i].submit(instance.batchEventProcessor[i]);
}
this.taskTimer =  Executors.newScheduledThreadPool(10?? new CustomThreadFactory("DisruptorHelper-scheduler"?? true));
inited = true;
}
/**
* ??ж????
* @param tk
*/
private void produce(int index??Task tk){
//System.out.println("index:="+index);
if(index<0||index>=group) {
System.out.println("out of group index:="+index);
return;
}
// if capacity less than 10%?? don't use ringbuffer anymore
System.out.println("capacity:="+ringBuffer[index].remainingCapacity());
if(ringBuffer[index].remainingCapacity() < BUFFER_SIZE * 0.1) {
System.out.println("disruptor:ringbuffer avaliable capacity is less than 10 %");
// do something
}else {
long sequence = ringBuffer[index].next();
//???????????ringBuffer??????к???
ringBuffer[index].get(sequence).setTask(tk);
//????????????????????
ringBuffer[index].publish(sequence);
}
}
/**
* ?????????capacity??????
* @param index
* @return
*/
private long  remainingcapacity(int index){
//System.out.println("index:="+index);
if(index<0||index>=group) {
System.out.println("out of group index:="+index);
return 0L;
}
long capacity= ringBuffer[index].remainingCapacity();
return capacity;
}
private void shutdown0(){
for(int i=0;i<group;i++){
execute[i].shutdown();
}
}
////////////////////////////////??????????????????////////////////////////////////////////////////////////
/**
* ???????
* @param tk
*/
public static void addTask(int priority??Task tk){
instance.produce(priority??tk);
}
/**
* ???????
* @param tk
* @param delay
* @param period
*/
public static void scheduleTask(int priority??Task tk??long delay??long period){
Runnable timerTask = new ScheduledTask(priority?? tk);
taskTimer.scheduleAtFixedRate(timerTask?? delay?? period?? TimeUnit.MILLISECONDS);
}
/**
* ???????
* @param tk
* @param hourse
* @param minus
* @param sec
* @return
*/
public static Runnable scheduleTask(int priority??Task tk?? int hourse??int minus??int sec)
{
Runnable timerTask = new ScheduledTask(priority?? tk);
//???2:30?????
long delay = Helper.calcDelay(hourse??minus??sec);
long period = Helper.ONE_DAY;
System.out.println("delay:"+(delay/1000)+"secs");
taskTimer.scheduleAtFixedRate(timerTask?? delay?? period?? TimeUnit.MILLISECONDS);
return timerTask;
}
//??????е??????з??
private static class ScheduledTask implements Runnable
{
private int priority;
private Task task;
ScheduledTask(int priority?? Task task)
{
this.priority = priority;
this.task = task;
}
public void run()
{
try{
instance.produce(priority??task);
}catch(Exception e){
System.out.println("catch exception in DisruptorHelper!");
}
}
}
public static long getRemainingCapatiye(int index){
return instance.getRemainingCapatiye(index);
}
public static void shutdown(){
if(!inited){
throw new RuntimeException("Disruptor????г??????");
}
instance.shutdown0();
}
}