Tomcat uses thread pool to handle remote concurrent requests

Tomcat uses thread pool to handle remote concurrent requests

By understanding how tomcat handles concurrent requests, we can understand thread pools, locks, queues, and unsafe classes. The following main code comes from

java-jre:

sun.misc.Unsafe
java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.ThreadPoolExecutor.Worker
java.util.concurrent.locks.AbstractQueuedSynchronizer
java.util.concurrent.locks.AbstractQueuedLongSynchronizer
java.util.concurrent.LinkedBlockingQueue

tomcat:

org.apache.tomcat.util.net.NioEndpoint
org.apache.tomcat.util.threads.ThreadPoolExecutor
org.apache.tomcat.util.threads.TaskThreadFactory
org.apache.tomcat.util.threads.TaskQueue

ThreadPoolExecutor

It is a thread pool implementation class that manages threads and reduces thread overhead. It can be used to improve task execution efficiency.

The parameters in the constructor are

public ThreadPoolExecutor(
 int corePoolSize,
 int maximumPoolSize,
 long keepAliveTime,
 TimeUnit unit,
 BlockingQueue<Runnable> workQueue,
 ThreadFactory threadFactory,
 RejectedExecutionHandler handler) {
 
}

corePoolSize is the number of core threads
maximumPoolSize is the maximum number of threads
keepAliveTime Maximum idle time of non-core threads (terminated if time exceeds)
unit time unit
workQueue queue, when there are too many tasks, store them in the queue first
threadFactory thread factory, a factory that creates threads
Handler decision strategy, when there are too many tasks and the queue can no longer store tasks, what to do, this object will handle it. This is an interface, you can customize the processing method

Application of ThreadPoolExecutor in http request in Tomcat

This thread pool is used by Tomcat to process each request as a separate task after receiving a remote request. Each time execute(Runnable) is called

initialization

org.apache.tomcat.util.net.NioEndpoint

When NioEndpoint is initialized, a thread pool is created

public void createExecutor() {
 internalExecutor = true;
 TaskQueue taskqueue = new TaskQueue();
 //TaskQueue is an unbounded queue and can be added to all the time, so handler is equivalent to invalid TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
 executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
 taskqueue.setParent( (ThreadPoolExecutor) executor);
 }

When the thread pool is created, call prestartAllCoreThreads() to initialize the core worker thread and start

public int prestartAllCoreThreads() {
 int n = 0;
 while (addWorker(null, true))
  ++n;
 return n;
 }

When the number of addWorker is equal to corePoolSize, addWorker(null,true) will return false, stopping the creation of worker threads.

Submitting tasks to the queue

Every time a client comes to request (http), a processing task will be submitted.

The worker gets the task from the queue and runs it. The following is the logic code for putting the task into the queue.

ThreadPoolExecutor.execute(Runnable) submits the task:

public void execute(Runnable command) {
 if (command == null)
  throw new NullPointerException();
 
 int c = ctl.get();
 	// Is the number of workers less than the number of core threads? After initialization in Tomcat, the first condition is generally not met and addWorker will not be called.
 if (workerCountOf(c) < corePoolSize) {
  if (addWorker(command, true))
  return;
  c = ctl.get();
 }
 	// workQueue.offer(command), add the task to the queue,
 if (isRunning(c) && workQueue.offer(command)) {
  int recheck = ctl.get();
  if (!isRunning(recheck) && remove(command))
  reject(command);
  else if (workerCountOf(recheck) == 0)
  addWorker(null, false);
 }
 else if (!addWorker(command, false))
  reject(command);
 }

workQueue.offer(command) completes the task submission (when Tomcat processes remote http requests).

workQueue.offer

TaskQueue is the concrete implementation class of BlockingQueue, the actual code of workQueue.offer(command) is:

public boolean offer(E e) {
 if (e == null) throw new NullPointerException();
 final AtomicInteger count = this.count;
 if (count.get() == capacity)
 return false;
 int c = -1;
 Node<E> node = new Node<E>(e);
 final ReentrantLock putLock = this.putLock;
 putLock.lock();
 try {
 if (count.get() < capacity) {
  enqueue(node); //Add tasks to the queue here c = count.getAndIncrement();
  if (c + 1 < capacity)
  notFull.signal();
 }
 finally
 putLock.unlock();
 }
 if (c == 0)
 signalNotEmpty();
 return c >= 0;
}

// Add tasks to the queue/**
 * Links node at end of queue.
 *
 * @param node the node
 */
private void enqueue(Node<E> node) {
 // assert putLock.isHeldByCurrentThread();
 // assert last.next == null;
 last = last.next = node; //link list structurelast.next = node; last = node
}

After that, it is the worker's job. In the run method, the worker gets the task submitted here by calling getTask() and executes it.

How does the thread pool handle newly submitted tasks?

After adding a worker, submit a task. Since the number of workers reaches corePoolSize, the task will be put into the queue, and the worker's run method will loop to get the tasks in the queue (when it is not empty).

Worker run method:

/** Delegates main run loop to outer runWorker */
 public void run() {
  runWorker(this);
 }

Loop through the tasks in the queue

RunWorker(worker) method loop code:

final void runWorker(Worker w) {
 Thread wt = Thread.currentThread();
 Runnable task = w.firstTask;
 w.firstTask = null;
 w.unlock(); // allow interrupts
 boolean completedAbruptly = true;
 try {
  while (task != null || (task = getTask()) != null) { //Loop to get tasks in the queue w.lock(); // Lock try {
   //Pre-execution processing beforeExecute(wt, task);
   //The tasks in the queue start executing task.run();
   // Run post-processing afterExecute(task, thrown);
  finally
   task = null;
   w.completedTasks++;
   w.unlock(); // Release the lock}
  }
  completedAbruptly = false;
 finally
  processWorkerExit(w, completedAbruptly);
 }
 }

task.run() executes the task

Lock application

ThreadPoolExecutor uses locks to ensure two things:
1. Add tasks to the queue to ensure that other threads cannot operate the queue
2. Get the task of the queue to ensure that other threads cannot operate the queue at the same time

Add task lock to queue

public boolean offer(E e) {
 if (e == null) throw new NullPointerException();
 final AtomicInteger count = this.count;
 if (count.get() == capacity)
  return false;
 int c = -1;
 Node<E> node = new Node<E>(e);
 final ReentrantLock putLock = this.putLock;
 putLock.lock(); //Lock try {
  if (count.get() < capacity) {
  enqueue(node);
  c = count.getAndIncrement();
  if (c + 1 < capacity)
   notFull.signal();
  }
 finally
  putLock.unlock(); //Release the lock}
 if (c == 0)
  signalNotEmpty();
 return c >= 0;
 }

Get queue task lock

private Runnable getTask() {
 boolean timedOut = false; // Did the last poll() time out?
		// ...omit for (;;) {
  try {
  Runnable r = timed ?
   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
   workQueue.take(); //Get a task in the queue if (r != null)
   return r;
  timedOut = true;
  } catch (InterruptedException retry) {
  timedOut = false;
  }
 }
 }
public E take() throws InterruptedException {
 E x;
 int c = -1;
 final AtomicInteger count = this.count;
 final ReentrantLock takeLock = this.takeLock;
 takeLock.lockInterruptibly(); // Lock try {
  while (count.get() == 0) {
  notEmpty.await(); //If there is no task in the queue, wait}
  x = dequeue();
  c = count.getAndDecrement();
  if (c > 1)
  notEmpty.signal();
 finally
  takeLock.unlock(); // Release the lock}
 if (c == capacity)
  signalNotFull();
 return x;
 }

volatile

In concurrent scenarios, this keyword is very common to modify member variables.

The main purpose is that when a public variable is modified by one thread, it is visible to other threads (real time)

sun.misc.Unsafe high concurrency related classes

When using the thread pool, the Unsafe class is commonly used. This class can perform some atomic CAS operations, lock threads, release threads, etc. in high concurrency.

sun.misc.Unsafe class is a low-level class.

Atomic data manipulation

The java.util.concurrent.locks.AbstractQueuedSynchronizer class has code to ensure atomic operations

protected final boolean compareAndSetState(int expect, int update) {
 // See below for intrinsics setup to support this
 return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 }

The code corresponding to the Unsafe class:

//The corresponding Java bottom layer is actually the native method, corresponding to the C++ code/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset,
      int expected,
      int x);

The function of the method is simply to update a value to ensure atomic operation. When you want to operate a member variable offset of an object o , modify o.offset.
To ensure accuracy under high concurrency, when you operate o.offset, you should read the correct value, and it cannot be modified by other threads in the middle to ensure the effectiveness of data operations in a high-concurrency environment.

That is, if the expected value is the same as the value in memory, expected == the value in memory, then the updated value is x, and true is returned to indicate that the modification is successful.

Otherwise, the expected value is different from the memory value, indicating that the value has been modified by other threads and cannot be updated to x. False is returned to tell the operator that the atomic modification has failed.

Blocking and waking up threads

public native void park(boolean isAbsolute, long time); //Block the current thread

The worker role of the thread pool loops to get queue tasks. If there is no task in the queue, worker.run is still waiting and will not exit the thread. The code uses notEmpty.await() to interrupt this worker thread and put it into a waiting thread queue (different from the task queue); when a new task is required, notEmpty.signal() is used to wake up this thread.

The bottom layers are
unsafe.park() blocks the current thread
public native void park(boolean isAbsolute, long time);

unsafe.unpark() wakes up the thread
public native void unpark(Object thread);

This operation is corresponding. When blocked, put the thread into the queue first. When awakened, take the blocked thread out of the queue, and unsafe.unpark(thread) wakes up the specified thread.

java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject class

Store thread information through linked lists

// Add a blocking thread private Node addConditionWaiter() {
  Node t = lastWaiter;
  // If lastWaiter is cancelled, clean out.
  if (t != null && t.waitStatus != Node.CONDITION) {
  unlinkCancelledWaiters();
  t = lastWaiter;
  }
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  if (t == null)
  firstWaiter = node;
  else
  t.nextWaiter = node;
  lastWaiter = node; //Put the newly blocked thread at the end of the linked list return node;
 }

// Take out a blocked thread public final void signal() {
  if (!isHeldExclusively())
  throw new IllegalMonitorStateException();
  Node first = firstWaiter; //The first blocked thread in the linked list if (first != null)
  doSignal(first);
 }

//After getting it, wake up this thread final boolean transferForSignal(Node node) {
  LockSupport.unpark(node.thread);
 return true;
 }
public static void unpark(Thread thread) {
 if (thread != null)
  UNSAFE.unpark(thread);
 }

This is the end of this article about how Tomcat uses thread pool to handle remote concurrent requests. For more information about Tomcat thread pool handling remote concurrent requests, please search for previous articles on 123WORDPRESS.COM or continue to browse the following related articles. I hope you will support 123WORDPRESS.COM in the future!

You may also be interested in:
  • Looking at Tomcat's thread model from the connector component - BIO mode (recommended)
  • Tomcat source code analysis of Web requests and processing
  • Detailed explanation of Tomcat's thread model for processing requests

<<:  Description of the default transaction isolation level of mysql and oracle

>>:  HTML table markup tutorial (37): background image attribute BACKGROUND

Recommend

A brief discussion on common operations of MySQL in cmd and python

Environment configuration 1: Install MySQL and ad...

Summary of two methods to implement vue printing function

Method 1: Install the plugin via npm 1. Install n...

Detailed explanation of mysql deadlock checking and deadlock removal examples

1. Query process show processlist 2. Query the co...

Detailed explanation of the pitfalls of mixing MySQL order by and limit

In MySQL, we often use order by for sorting and l...

Linux swap partition (detailed explanation)

Table of contents linux 1. What is SWAP 2. What d...

How to build lnmp environment in docker

Create a project directory mkdir php Create the f...

How to uninstall and reinstall Tomcat (with pictures and text)

Uninstall tomcat9 1. Since the installation of To...

Two implementation codes of Vue-router programmatic navigation

Two ways to navigate the page Declarative navigat...

How to reduce the memory and CPU usage of web pages

<br />Some web pages may not look large but ...

Installation tutorial of docker in linux

The Docker package is already included in the def...

Simple usage examples of MySQL custom functions

This article uses examples to illustrate the usag...

MySQL uses binlog logs to implement data recovery

MySQL binlog is a very important log in MySQL log...

Install mysql 5.6 from yum source in centos7.4 system

System environment: centos7.4 1. Check whether th...

Each time Docker starts a container, the IP and hosts specified operations

Preface Every time you use Docker to start a Hado...

Solve the problem of ugly blue border after adding hyperlink to html image img

HTML img produces an ugly blue border after addin...