By understanding how tomcat handles concurrent requests, we can understand thread pools, locks, queues, and unsafe classes. The following main code comes from java-jre: sun.misc.Unsafe java.util.concurrent.ThreadPoolExecutor java.util.concurrent.ThreadPoolExecutor.Worker java.util.concurrent.locks.AbstractQueuedSynchronizer java.util.concurrent.locks.AbstractQueuedLongSynchronizer java.util.concurrent.LinkedBlockingQueue tomcat: org.apache.tomcat.util.net.NioEndpoint org.apache.tomcat.util.threads.ThreadPoolExecutor org.apache.tomcat.util.threads.TaskThreadFactory org.apache.tomcat.util.threads.TaskQueue ThreadPoolExecutorIt is a thread pool implementation class that manages threads and reduces thread overhead. It can be used to improve task execution efficiency. The parameters in the constructor are public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { } corePoolSize is the number of core threads Application of ThreadPoolExecutor in http request in TomcatThis thread pool is used by Tomcat to process each request as a separate task after receiving a remote request. Each time execute(Runnable) is called initialization When NioEndpoint is initialized, a thread pool is created public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); //TaskQueue is an unbounded queue and can be added to all the time, so handler is equivalent to invalid TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); } When the thread pool is created, call prestartAllCoreThreads() to initialize the core worker thread and start public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; } When the number of addWorker is equal to corePoolSize, addWorker(null,true) will return false, stopping the creation of worker threads. Submitting tasks to the queue Every time a client comes to request (http), a processing task will be submitted. The worker gets the task from the queue and runs it. The following is the logic code for putting the task into the queue. ThreadPoolExecutor.execute(Runnable) submits the task: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // Is the number of workers less than the number of core threads? After initialization in Tomcat, the first condition is generally not met and addWorker will not be called. if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // workQueue.offer(command), add the task to the queue, if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } workQueue.offer(command) completes the task submission (when Tomcat processes remote http requests). workQueue.offer TaskQueue is the concrete implementation class of BlockingQueue, the actual code of workQueue.offer(command) is: public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); //Add tasks to the queue here c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; } // Add tasks to the queue/** * Links node at end of queue. * * @param node the node */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; //link list structurelast.next = node; last = node } After that, it is the worker's job. In the run method, the worker gets the task submitted here by calling getTask() and executes it. How does the thread pool handle newly submitted tasks? After adding a worker, submit a task. Since the number of workers reaches corePoolSize, the task will be put into the queue, and the worker's run method will loop to get the tasks in the queue (when it is not empty). Worker run method: /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } Loop through the tasks in the queue RunWorker(worker) method loop code: final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { //Loop to get tasks in the queue w.lock(); // Lock try { //Pre-execution processing beforeExecute(wt, task); //The tasks in the queue start executing task.run(); // Run post-processing afterExecute(task, thrown); finally task = null; w.completedTasks++; w.unlock(); // Release the lock} } completedAbruptly = false; finally processWorkerExit(w, completedAbruptly); } } task.run() executes the task Lock application ThreadPoolExecutor uses locks to ensure two things: Add task lock to queue public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); //Lock try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally putLock.unlock(); //Release the lock} if (c == 0) signalNotEmpty(); return c >= 0; } Get queue task lock private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? // ...omit for (;;) { try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //Get a task in the queue if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // Lock try { while (count.get() == 0) { notEmpty.await(); //If there is no task in the queue, wait} x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); finally takeLock.unlock(); // Release the lock} if (c == capacity) signalNotFull(); return x; } volatile In concurrent scenarios, this keyword is very common to modify member variables. The main purpose is that when a public variable is modified by one thread, it is visible to other threads (real time) sun.misc.Unsafe high concurrency related classes When using the thread pool, the Unsafe class is commonly used. This class can perform some atomic CAS operations, lock threads, release threads, etc. in high concurrency. Atomic data manipulationThe java.util.concurrent.locks.AbstractQueuedSynchronizer class has code to ensure atomic operations protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } The code corresponding to the Unsafe class: //The corresponding Java bottom layer is actually the native method, corresponding to the C++ code/** * Atomically update Java variable to <tt>x</tt> if it is currently * holding <tt>expected</tt>. * @return <tt>true</tt> if successful */ public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x); The function of the method is simply to update a value to ensure atomic operation. When you want to operate a member variable That is, if the expected value is the same as the value in memory, expected == the value in memory, then the updated value is x, and true is returned to indicate that the modification is successful. Otherwise, the expected value is different from the memory value, indicating that the value has been modified by other threads and cannot be updated to x. False is returned to tell the operator that the atomic modification has failed. Blocking and waking up threadspublic native void park(boolean isAbsolute, long time); //Block the current thread The worker role of the thread pool loops to get queue tasks. If there is no task in the queue, worker.run is still waiting and will not exit the thread. The code uses The bottom layers are unsafe.unpark() wakes up the thread This operation is corresponding. When blocked, put the thread into the queue first. When awakened, take the blocked thread out of the queue, and unsafe.unpark(thread) wakes up the specified thread. Store thread information through linked lists // Add a blocking thread private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; //Put the newly blocked thread at the end of the linked list return node; } // Take out a blocked thread public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; //The first blocked thread in the linked list if (first != null) doSignal(first); } //After getting it, wake up this thread final boolean transferForSignal(Node node) { LockSupport.unpark(node.thread); return true; } public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } This is the end of this article about how Tomcat uses thread pool to handle remote concurrent requests. For more information about Tomcat thread pool handling remote concurrent requests, please search for previous articles on 123WORDPRESS.COM or continue to browse the following related articles. I hope you will support 123WORDPRESS.COM in the future! You may also be interested in:
|
<<: Description of the default transaction isolation level of mysql and oracle
>>: HTML table markup tutorial (37): background image attribute BACKGROUND
Environment configuration 1: Install MySQL and ad...
Method 1: Install the plugin via npm 1. Install n...
1. Query process show processlist 2. Query the co...
In MySQL, we often use order by for sorting and l...
Table of contents linux 1. What is SWAP 2. What d...
Create a project directory mkdir php Create the f...
Uninstall tomcat9 1. Since the installation of To...
Two ways to navigate the page Declarative navigat...
<br />Some web pages may not look large but ...
The Docker package is already included in the def...
This article uses examples to illustrate the usag...
MySQL binlog is a very important log in MySQL log...
System environment: centos7.4 1. Check whether th...
Preface Every time you use Docker to start a Hado...
HTML img produces an ugly blue border after addin...