Статья не претендует на полноту, в ней описан личный опыт разработки сетевых приложений на C, node.js и golang. В каждом языке программирования свои особенности, в некоторых языках данные паттерны уже реализованы в стандартных библиотеках, но концепция везде одна.

Статья вдохновлена языком go, потому что он предоставляет удобные абстракции для работы с потоками из коробки.

Для кого это

Это статья для тех, кто не является профессиональным сетевым (бэкенд) разработчиком, но хочет написать надежный и быстрый сервис, предоставляющий API по протоколу прикладного уровня передачи данных (HTTP, WebSocket, etc. over TCP).

Паттерн WaitGroup

При создании потока, ваш код больше не исполняется последовательно

func thread() { while(true) print(1) } // child thread
create_thread(thread)
while(true) print(2) // main thread
2
1
1
1
2
2
1
2

и завершив основной поток система убивает (а может и не сразу) дочерние потоки

func work() { sleep(1m); print("child") } // child thread
create_thread(work)
print("main") // main thread
exit(0)
main

чтобы программа отработала корректно, вам необходимо дождаться завершения дочернего потока work. Для этого в библиотеках языка предусмотрены специальные wait-функции wait_thread(work), но мы рассмотрим задачу поинтереснее.

Как поступить если у нас есть N потоков и каждый поток может внутри себя создавать еще X потоков? Ответ реализовать или использовать паттерн wait group.

Заведем атомарный счетчик и перед созданием потока будем его увеличивать на 1, а в самом потоке при его завершении уменьшать на 1

atomic int wait_group = 0
func inc() { wait_group = wait_group + 1 }
func dec() { wait_group = wait_group - 1 }
func work1() {
  sleep(30s) 
  print("work1")
  dec()
}
func work2() {
  sleep(10s)
  print("work2")
  dec()
}

inc()
create_thread(work1)
int()
create_thread(work2)

wihle(wait_group != 0) {} // wait in main thread
print("exit")
work2
work1
exit

Задача выполнена, но стоит отметить один важный момент.

Инкремент нужно производить перед созданием потока, поток может завершиться быстрее, чем основной тред произведет инкремент. Подобные состояния называются состояниями гонки (race condition), их стоит избегать, но подробнее о синхронизации мы поговорим дальше.

Архитектура приложения на основе WaitGroup

Давайте для простоты описания примеров в качестве основы возьмем реализацию WaitGroup из языка Go

wg.add() // инкремент счетчика
wg.done() // декремент счетчика
wg.wait() // ожидание нуля в счетчике

Мы уже поняли, что поток — это какая-то долгая работа, давайте выразим ее интерфейсом

interface Worker {
  run(): void
}

type Workers = Worker[]

И теперь наша основной поток будет выглядеть так

// main
Workers works = [new Work1, new Work2, new Work3]
for (work of works) {
  wg.add()
  create_thread(func() { // create new thread
    work.run()
    wg.done()
  })
}
wg.wait()

Теперь вся работа с потоками сосредоточена в одном месте, мы можем как обычно продолжить писать unit тесты на наши классы. Но у нас появилась другая задача - мы захотели останавливать все воркеры, например когда пользователь хочет завершить нашу программу самостоятельно.

Корректное освобождение ресурсов

Ни для кого не секрет что корректное освобождение ресурсов очень важно, особенно это важно для сетевых приложений которые должны работать 24/7. Рассмотрим один пример сервера.

Сервер создает поток обработки на каждого клиента в котором открывает сокет. Для операционной системы unix открытый сокет это файловый дескриптор, количество открытых дескрипторов ограничено ядром операционной системы. Если мы завершим основной тред сервера, он убьет потоки, но в ядре операционной системы дескрипторы сокетов останутся открытыми (на время tcp_keepalive_time + tcp_keepalive_probes * tcp_keepalive_intvl - но это не точно 🙂 ) и вскоре мы не сможем обрабатывать новые подключения, потому что все файловые дескрипторы будут заняты. Не стоит также забывать про фрагментацию памяти и уповать на garbage collector вашего языка программирования. Небольшая утечка памяти в программе, которая работает 24/7 может иметь накопительный эффект.

Простой способ управления корректным завершением потоков

Создать в главном потоке атомарный флаг done и передавать его в дочерние потоки по ссылке

interface Worker {
  run(bool *done): void
}

class Work implements Worker {
  run(bool *done) {
    while(!done) { ... }
  }
}
class WaitSignal implements Worker {
  run(bool *done) {
    // wait SIGINT, SIGTERM os signal, see man
    *done = true
  }
}

// main
atomic bool done = false
Workers works = [new Work, new WaitSignal]
for (work of works) {
  wg.add()
  create_thread(func() { // create new thread
    work.run(&done)
    wg.done()
  })
}
wg.wait()

SIGINT, SIGTERM - это сигналы передаваемые системой при нажатии ctrl+c и kill процесса, если мы говорим о сетевом программировании то это мир unix систем и их нужно корректно обрабатывать. Тот же docker их использует для остановки процесса.

Если у вас в потоке запускается какое-то долгое вычисление передавайте этот флаг и туда

run(bool *done) {
  longFunction(done)
}

longFunction(bool *done) {
  while(!done) { ... } 
}

Долгим в сетевом мире может быть, например, процесс подключения к базе данных или любой сетевой запрос, потому что сеть — не идеальная среда, и запрос может идти несколько часов/минут, вы уже завершили программу, но все еще ждете, пока в каком-то потоке идет попытка подключения к серверу. Для решения подобных проблем необходимо вводить таймауты на операции, эту тему мы затронем далее.

Сложный способ управления корректным завершением потоков

В языке Go есть очень интересный пакет под названием Context в других языках программирования я такого не встречал, но эта абстракция очень удобна в использовании.

type Context interface {
    // Deadline returns the time when work done on behalf of this context
	// should be canceled. Deadline returns ok==false when no deadline is
	// set. Successive calls to Deadline return the same results.
	Deadline() (deadline time.Time, ok bool)
	// Done returns a channel that's closed when work done on behalf of this
	// context should be canceled.
	Done() <-chan struct{}
	Value(key interface{}) interface{}
}

метод Done() — что-то очень знакомое и напоминает наш флаг, это он и есть, но фишкой контекста является то, что он создается как копия родительского, но к родительскому добавляется ссылка на дочерние контексты, и если завершается родитель, он сначала завершает все дочерние контексты, это позволяет намного гибче контролировать потоки.
В языке Go принято, что контекст передается через DI первым аргументом функции, функция, которая принимает контекст, обязана проверять его статус и завершаться, если контекст закрыт.

func work(ctx context.Context) {
  for { // endless loop
	select {
		case <-ctx.Done():
			return
		}
	}
}

Рассмотри метод WithCancel

ctx, cancel := context.WithCancel(context.Background())

context.Background() возвращает самый первый и чистый контекст, этот метод должен вызываться ТОЛЬКО один раз в основном main потоке программы, это означает что при вызове функции cancel() все дочерние контексты закроются = закроются все потоки и все их дочерние потоки.

теперь поглядим зачем нужно это наследование

mainCtx, cancel := context.WithCancel(context.Background())
defer cancel() // вызовется при завершении работы основного потока

func work1(parentCtx context.Context) {
  work1Ctx, cancelWork1Ctx := context.WithCancel(parentCtx)
  defer cancelWork1Ctx() // defer выполняется при завершении функции work1
  go work2(work1Ctx) // создали поток с work2
  for { // делаем работу
    select {
		case <-parentCtx.Done(): // проверяем не закрыт ли родительский контекст
			return
		}
	}
  }
  // завершаем корректно работу
  // вызываются defer
}

func work2(parentCtx context.Context) {
  ... проверяем не закрыт ли родительский контекст
}

go work1(mainCtx) // создаем поток работы

// ждем например сигнала sigterm, sigint 
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
 
// это блокирующая операция поэтому основной поток будет "висеть"
<-done
// вызываются defer

суть такая, если мы завершим work1Ctx - закроются лишь те потоки, которые слушают этот контекст, при этом кто слушает mainCtx продолжат работать, но если закрыть mainCtx - он завершит дочерний work1Ctx тем самым по цепочке корректно завершаться все потоки.

Вторая немаловажная функция context.WithTimeout - она создает контекст который автоматически закроется через заданный интервал, это позволяет нам контролировать “зависшие“ соединения, “слишком“ долгие функции, закрывать этот контекст нужно ПОСЛЕ того как совершиться долгая операция, потому что она может завершиться, раньше чем сработает таймер.

func slowOperationWithTimeout(ctx context.Context) (Result, error) {
	ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
	defer cancel()  // releases resources if slowOperation completes before timeout elapses
	return slowOperation(ctx)
}

Как вы понимаете slowOperation внутри так же проверяет ctx.Done() и завершиться если закроется родительский контекст, не дожидаясь срабатывания таймера.

Возможно в вашем языке есть подобные решения, а может вы захотите переписать context на свой язык, благо код пакетов go это open source https://github.com/golang/go/blob/master/src/context/context.go

Немного о мьютексах и паттерне state машины

При работе с асинхронным/многопоточным кодом рано или поздно вы столкнетесь с состоянием гонки (race condition)

int x = 0
thread_work1() {
  x = x + 5
}
thread_work2() {
  x = x + 3
}

print(x) // ?

потоки не обязательно будут работать по очереди, очередь их выполнения определяет планировщик операционной системы или самого языка (например Go). В данном примере x может быть больше исходного на 3, 5 или 8

Выше я приводил в пример атомарный тип данных, но подобная функциональность поддерживается далеко не во всех языках программирования, атомарные операции выполняются на уровне процессора, а это уже слишком низкий уровень. Поэтому для синхронизации потоков в операционных системах придумали мьютексы, семафоры и критические секции.

Задачей мьютекса является защита объекта от доступа к нему других потоков, отличных от того, который завладел мьютексом. В каждый конкретный момент только один поток может владеть объектом, защищённым мьютексом. Если другому потоку будет нужен доступ к данным, защищённым мьютексом, то этот поток блокируется до тех пор, пока мьютекс не будет освобождён. Мьютекс защищает данные от повреждения в результате асинхронных изменений (состояние гонки), однако при неправильном использовании могут порождаться другие проблемы, например, взаимная блокировка или двойной захват.

Для простоты понимания рассмотрим пример на языке Go, он имеет пакет абстракции sync.Mutex который абстрагирует нас от операционный системы под которую компилируется наша программа. Он имеет два метода Lock и Unlock

// SafeCounter is safe to use concurrently.
type SafeCounter struct {
	mu sync.Mutex
	v  map[string]int
}

// Inc increments the counter for the given key.
func (c *SafeCounter) Inc(key string) {
	c.mu.Lock()
	// Lock so only one goroutine at a time can access the map c.v.
	c.v[key]++
	c.mu.Unlock()
}

// Value returns the current value of the counter for the given key.
func (c *SafeCounter) Value(key string) int {
	c.mu.Lock()
	// Lock so only one goroutine at a time can access the map c.v.
	defer c.mu.Unlock()
	return c.v[key]
}

В такой реализации если мы в нескольких потоках вызываем c.Inc(“key“), значение будет увеличиваться на 1, потому что после вызова c.mu.Lock() - только один поток сможет работать со структурой, другие потоки “зависнут“ в ожидании, тем самым обеспечивая синхронность выполнения операции инкремента.

Важно понимать что нельзя блокировать структуру на длительное время - тем самым вы блокируете работу всей программы в целом и не получите выгоды от многопоточности.

Если же вы забудете разблокировать мьютекс, ваша программа зависнет, потому что остальные потоки будут “ждать“ его разблокировки, так что будьте внимательны 🙂

State machine (конечный автомат)

Все наши с вами компьютеры это конечные автоматы, и как вы могли заметить в примерах потоков у нас крутятся какие-то “бесконечные” циклы.

Основная идея паттерна в том, что программа может находиться в одном из нескольких состояний, которые всё время сменяют друг друга. Набор этих состояний, а также переходов между ними, предопределён и конечен. Находясь в разных состояниях, программа может по-разному реагировать на одни и те же события, которые происходят с ней.

Для примера возьмем наш encoder, он умеет читать конфиг и инициализироваться, кодировать видео и корректно завершать свою работу. Отобразим эти состояния

enum State {
  INIT
  DESTROY
}

Допустим управление осуществляется по HTTP, создадим worker реализующий сервер


func (a ApiServer) Start(ctx context.Context, sm: StateMachine) error {
  s := createServer()
  registerRoutes(sm)
  
  go func() {
		<-ctx.Done()
		// timer for close all connections
		timeoutCtx, shutdown := context.WithTimeout(ctx, 5*time.Second)
		_ = s.Shutdown(timeoutCtx)
		shutdown()
	}()
	
	if err := s.Serve(listener); err != http.ErrServerClosed {
		return err
	}
	
	//  close database connections here

	return nil
}

func registerRoutes(sm: StateMachine) {
 // упростим код
 handleFunc("/api/v1/init", () { return sm.Set(INIT) })
 handleFunc("/api/v1/state", () { return sm.State() })
 handleFunc("/api/v1/destroy", () { return sm.Set(DESTROY) })	
}

Дальше сделаем основной конечный автомат

// wokers/encoder.go

func Start(ctx context.Context, sm: StateMachine) error {
  var destroy context.CancelFunc
  
  defer func() {
    if destroy != nil {
      destroy()
    } 
  }()
  
  for { // бесконечный циксл
    select {
        case state <-sm.State():
          switch state {
            case INIT:
              var encoderCtx context.Context
              encoderCtx, destroy = context.WithCancel(ctx)
              go init(encoderCtx)
          case DESTROY:
              destroy()
          }
        case <-ctx.Done(): // проверяем родительский контекст может завершился?
          return nil
    }
  }
  
  return nil
}

Теперь соберем нашу основную программу, которая является точкой входа

// main
state := sm.New()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// запустим поток ожидания системных сигналов
go func() {
  quit := make(chan os.Signal, 1)
  signal.Notify(quit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
  select {
	case <-ctx.Done():
		return
	case <-quit:
		cancel()
	}
}

var wg sync.WaitGroup
Workers works = [api.New(), encoder.New()]
wg.Add(len(works))
for _, work := range works {
  go func() { // create new thread
    work.Start(ctx, state)
    wg.Done()
  }()
}
wg.Wait()

Экземпляр state у нас один - он создается в главном потоке и передается дочерним потокам через указатель при создании, один из них меняет состояния, другой реагирует на их изменение.

По факту у нас получилось 6 потоков управляемые контекстами

  1. основной поток процесса ожидающий закрытия всех запущенных потоков

    1. поток ожидания сигнала завершения программы

    2. поток запуска обработки запросов по http

      1. поток ожидающий закрытия родительского потока для корректного выключения http сервера
    3. поток обработки стейт машины

      1. поток енкодера управляемый стейт машиной

Список литературы

Процессы, потоки, синхронизация

  • Таненбаум Э.С. "Современные операционные системы"
  • Рихтер Джеффри “Windows для профессионалов. Создание эффективных Win32-пpилoжeний с учетом специфики 64-разрядной версии Windows”

Паттерны

https://refactoring.guru/ru/design-patterns/state https://go.dev/blog/pipelines

WaitGroup

WaitForMultipleObjects function (synchapi.h)
WaitForSingleObject and WaitForMultipleObjects equivalent in Linux?
Boost class thread_group