/
com/planet_ink/coffee_mud/Abilities/Common/
com/planet_ink/coffee_mud/Abilities/Diseases/
com/planet_ink/coffee_mud/Abilities/Druid/
com/planet_ink/coffee_mud/Abilities/Fighter/
com/planet_ink/coffee_mud/Abilities/Languages/
com/planet_ink/coffee_mud/Abilities/Misc/
com/planet_ink/coffee_mud/Abilities/Prayers/
com/planet_ink/coffee_mud/Abilities/Properties/
com/planet_ink/coffee_mud/Abilities/Skills/
com/planet_ink/coffee_mud/Abilities/Songs/
com/planet_ink/coffee_mud/Abilities/Spells/
com/planet_ink/coffee_mud/Abilities/Thief/
com/planet_ink/coffee_mud/Abilities/Traps/
com/planet_ink/coffee_mud/Behaviors/
com/planet_ink/coffee_mud/CharClasses/interfaces/
com/planet_ink/coffee_mud/Commands/
com/planet_ink/coffee_mud/Commands/interfaces/
com/planet_ink/coffee_mud/Common/
com/planet_ink/coffee_mud/Common/interfaces/
com/planet_ink/coffee_mud/Exits/interfaces/
com/planet_ink/coffee_mud/Items/Armor/
com/planet_ink/coffee_mud/Items/Basic/
com/planet_ink/coffee_mud/Items/CompTech/
com/planet_ink/coffee_mud/Items/MiscMagic/
com/planet_ink/coffee_mud/Items/Weapons/
com/planet_ink/coffee_mud/Items/interfaces/
com/planet_ink/coffee_mud/Libraries/
com/planet_ink/coffee_mud/Libraries/interfaces/
com/planet_ink/coffee_mud/Locales/
com/planet_ink/coffee_mud/MOBS/
com/planet_ink/coffee_mud/Races/
com/planet_ink/coffee_mud/Races/interfaces/
com/planet_ink/coffee_mud/WebMacros/
com/planet_ink/coffee_mud/WebMacros/interfaces/
com/planet_ink/coffee_mud/core/
com/planet_ink/coffee_mud/core/collections/
com/planet_ink/coffee_mud/core/interfaces/
com/planet_ink/coffee_mud/core/intermud/
com/planet_ink/coffee_mud/core/intermud/i3/
com/planet_ink/coffee_web/server/
com/planet_ink/siplet/applet/
lib/
resources/factions/
resources/fakedb/
resources/progs/autoplayer/
resources/quests/holidays/
web/
web/admin.templates/
web/admin/grinder/
web/admin/images/
web/clan.templates/
web/pub.templates/
web/pub/images/mxp/
web/pub/sounds/
web/pub/textedit/
package com.planet_ink.coffee_mud.core.threads;

import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import com.planet_ink.coffee_mud.Common.interfaces.Session;
import com.planet_ink.coffee_mud.core.CMLib;
import com.planet_ink.coffee_mud.core.Log;
import com.planet_ink.coffee_mud.core.collections.Pair;
import com.planet_ink.coffee_mud.core.collections.PairSVector;
import com.planet_ink.coffee_mud.core.collections.SLinkedList;
import com.planet_ink.coffee_mud.core.collections.STreeMap;
import com.planet_ink.coffee_mud.core.collections.STreeSet;
import com.planet_ink.coffee_mud.core.collections.UniqueEntryBlockingQueue;
import com.planet_ink.coffee_mud.core.interfaces.TickableGroup;
/*
   Copyright 2010-2016 Bo Zimmerman

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

	   http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
*/
public class CMThreadPoolExecutor extends ThreadPoolExecutor
{
	protected Map<Runnable,Thread>	active = new HashMap<Runnable,Thread>();
	protected long  				timeoutMillis;
	protected CMThreadFactory		threadFactory;
	protected int   				queueSize = 0;
	protected String				poolName = "Pool";
	protected volatile long 		lastRejectTime = 0;
	protected volatile int  		rejectCount = 0;

	protected static class CMArrayBlockingQueue<E> extends ArrayBlockingQueue<E>{
		private static final long serialVersionUID = -4557809818979881831L;
		public CMThreadPoolExecutor executor = null;
		public CMArrayBlockingQueue(int capacity) { super(capacity);}
		@Override public boolean offer(E o)
		{
			final int allWorkingThreads = executor.getActiveCount() + super.size();
			return (allWorkingThreads < executor.getPoolSize()) && super.offer(o);
		}
	}

	public CMThreadPoolExecutor(String poolName,
								int corePoolSize, int maximumPoolSize,
								long keepAliveTime, TimeUnit unit,
								long timeoutMins, int queueSize)
	{
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new CMArrayBlockingQueue<Runnable>(queueSize));
		((CMArrayBlockingQueue<Runnable>)this.getQueue()).executor=this;
		timeoutMillis=timeoutMins * 60 * 1000;
		this.poolName=poolName;
		threadFactory=new CMThreadFactory(poolName);
		setThreadFactory(threadFactory);
		this.queueSize=queueSize;
		setRejectedExecutionHandler(new RejectedExecutionHandler()
		{
			@Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
			{
				try { executor.getQueue().put(r); } catch (final InterruptedException e) { throw new RejectedExecutionException(e); }
			}
		});
	}

	@Override
	protected void beforeExecute(Thread t, Runnable r)
	{
		synchronized(active)
		{
			if(t instanceof CMFactoryThread)
				((CMFactoryThread)t).setRunnable(r);
			active.put(r,t);
		}
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t)
	{
		synchronized(active)
		{
			final Thread th=active.get(r);
			if(th instanceof CMFactoryThread)
				((CMFactoryThread)th).setRunnable(null);
			active.remove(r);
		}
	}

	@Override public int getActiveCount() { return active.size(); }

	public boolean isActive(Runnable r)
	{
		return active.containsKey(r);
	}

	public boolean isActiveOrQueued(Runnable r)
	{
		return active.containsKey(r) || getQueue().contains(r);
	}

	@Override
	public void execute(Runnable r)
	{
		try
		{
			if(this.getQueue().contains(r))
				return;
			super.execute(r);
			if((rejectCount>0)&&(System.currentTimeMillis()-lastRejectTime)>5000)
			{
				Log.warnOut(rejectCount+" Pool_"+poolName,"Threads rejected.");
				rejectCount=0;
			}
		}
		catch(final RejectedExecutionException e)
		{
			if(r instanceof CMRunnable)
			{
				final Collection<CMRunnable> runsKilled = getTimeoutOutRuns(1);
				for(final CMRunnable runnable : runsKilled)
				{
					if(runnable instanceof Session)
					{
						final Session S=(Session)runnable;
						final StringBuilder sessionInfo=new StringBuilder("");
						sessionInfo.append("status="+S.getStatus()+" ");
						sessionInfo.append("active="+S.activeTimeMillis()+" ");
						sessionInfo.append("online="+S.getMillisOnline()+" ");
						sessionInfo.append("lastloop="+(System.currentTimeMillis()-S.getInputLoopTime())+" ");
						sessionInfo.append("addr="+S.getAddress()+" ");
						sessionInfo.append("mob="+((S.mob()==null)?"null":S.mob().Name()));
						Log.errOut("Pool_"+poolName,"Timed-Out Runnable: "+sessionInfo.toString());
					}
					else
					if(runnable instanceof TickableGroup)
					{
						final TickableGroup G=(TickableGroup)runnable;
						Log.errOut("Pool_"+poolName,"Timed-Out Runnable: "+G.getName()+"-"+G.getStatus()+"\n\r");
					}
					else
						Log.errOut("Pool_"+poolName,"Timed-Out Runnable: "+runnable.toString());
				}
			}
			lastRejectTime=System.currentTimeMillis();
			rejectCount++;
		}
	}

	public Collection<CMRunnable> getTimeoutOutRuns(int maxToKill)
	{
		final LinkedList<CMRunnable> timedOut=new LinkedList<CMRunnable>();
		if(timeoutMillis<=0)
			return timedOut;
		final LinkedList<Thread> killedOut=new LinkedList<Thread>();
		synchronized(active)
		{
			try
			{
				for (final Runnable runnable : active.keySet())
				{
					if(runnable instanceof CMRunnable)
					{
						final CMRunnable cmRunnable=(CMRunnable)runnable;
						final Thread thread=active.get(runnable);
						if(cmRunnable.activeTimeMillis() > timeoutMillis)
						{
							if(timedOut.size() >= maxToKill)
							{
								CMRunnable leastWorstOffender=null;
								for(final CMRunnable r : timedOut)
								{
									if((leastWorstOffender != null)
									&&(r.activeTimeMillis() < leastWorstOffender.activeTimeMillis()))
										leastWorstOffender=r;
								}
								if(leastWorstOffender!=null)
								{
									if(cmRunnable.activeTimeMillis() < leastWorstOffender.activeTimeMillis())
										continue;
									else
										timedOut.remove(leastWorstOffender);
								}
							}
							timedOut.add(cmRunnable);
							killedOut.add(thread);
						}
					}
				}
			}
			catch(final Exception e)
			{
			}
		}
		try
		{
			while(killedOut.size()>0)
			{
				final Thread t = killedOut.remove();
				active.remove(t);
				CMLib.killThread(t,100,3);
			}
		}
		catch(final Exception e)
		{
		}
		return timedOut;
	}
}