параллельность + синхронизации (примеры)

Вопросы написания собственного программного кода (на любых языках)

Модератор: Olej

Аватара пользователя
Olej
Писатель
Сообщения: 21336
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 11 дек 2014, 20:48

Olej писал(а): С Go я ошибся!
Не исключено, что я так же ошибаюсь и для других исполняющих систем ... той же Java, например.
Я просто здесь складываю то, что реально наблюдаю ... чтобы не потерялось.

Хотя относительно Python - это известно и понятно, что происходит ... и непонятно, почему для других интерпретирующих систем это могло бы быть иначе...

Аватара пользователя
Olej
Писатель
Сообщения: 21336
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 12 дек 2014, 01:29

Olej писал(а): Интересно (будет) посмотреть что там в Boost реализации класса boost::thread ... но что-то (поверхностное изучение boost::thread ;-) ) мне подсказывает, что никаких абсолютно отличий там нет.
Судя по хронологии, реализацию C++11 просто списали один в один с реализации Boost (как, впрочем, было и с STL до его включения в C++ стандарт).
Это так и есть, потому что если взглянуть на документацию последней редакции Boost 1.57
Last revised: October 30, 2014 at 10:20:47 GMT
То там находим:
Conformance and Extension
C++11 standard Thread library
C++14 standard Thread library - accepted changes
C++14 TS Extensions for Concurrency V1
C++1z TS Concurrency - On going proposals
The Boost.Thread library was originally written and designed by William E. Kempf (version 1).

Anthony Williams version (version 2) was a major rewrite designed to closely follow the proposals presented to the C++ Standards Committee, in particular N2497, N2320, N2184, N2139, and N2094

Vicente J. Botet Escriba started (version 3) the adaptation to comply with the accepted Thread C++11 library (Make use of Boost.Chrono and Boost.Move) and the Shared Locking Howard Hinnant proposal except for the upward conversions. Some minor non-standard features have been added also as thread attributes, reverse_lock, shared_lock_guard.
Так что на сегодня реализации thread:: в Boost и в стандартах C++ - это практически одно и тоже, и делается одними и теми же людьми.

Аватара пользователя
Olej
Писатель
Сообщения: 21336
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 12 дек 2014, 02:02

Olej писал(а):

Код: Выделить всё

[Olej@modules SpeedThread.4]$ ./CSpeed1 1 -3
число потоков 1
процессоров в системе 4
1341953458
операций 1341953458 : 1 * 1341953458 [+/-0.00%]
Вот это тоже полезное и интересное место: кто и как определяет число доступных процессоров:

- C & C++

Код: Выделить всё

#include <unistd.h>
#include <iostream> 
#include <thread>        
using namespace std;

int main() {
   cpu_set_t mask;
   sched_getaffinity( 0, sizeof( cpu_set_t ), &mask );
   cout << "присутствует процессоров в системе "
        << thread::hardware_concurrency() << endl
        << "число доступных процессоров "
        << CPU_COUNT( &mask ) << endl;
   return 0;
}
Как понятно, 2-й способ это то как это определяет библиотека C, Linux API, а 1-й - как определяется в thread:: C++11 & Boost. 2-й (C) способ определяется на основе афинити-маски процесса, а 1-й ... я не знаю на основе чего ;-) :

Код: Выделить всё

[Olej@modules SpeedThread.5]$ ./procnum_cc 
присутствует процессоров в системе 4
число доступных процессоров 4
[Olej@modules SpeedThread.5]$ taskset -c 2 ./procnum_cc 
присутствует процессоров в системе 4
число доступных процессоров 1
[Olej@modules SpeedThread.5]$ taskset -c 0-3 ./procnum_cc 
присутствует процессоров в системе 4
число доступных процессоров 4
[Olej@modules SpeedThread.5]$ taskset -c 1,2 ./procnum_cc 
присутствует процессоров в системе 4
число доступных процессоров 2
И определяют они совершенно разные вещи!
То же на Go:

Код: Выделить всё

package main
import "fmt"
import "runtime"

func main() {
   fmt.Printf( "число доступных процессоров %v\n", runtime.NumCPU() )
}

Код: Выделить всё

[Olej@modules SpeedThread.4]$ taskset -c 0 ./procnum_go
число доступных процессоров 1
То, что видят C & Go - это то что нужно.
А то, что C++ & Boost - это гипотетически, то что присутствует в железе, но к вашему приложению касательства не имеет.
В Boost 1.57 (2014г.) mпоявился такой метод (в дополнение к thread::hardware_concurrency()):

Код: Выделить всё

static unsigned physical_concurrency() noexcept;

Но что это означает я не знаю - для этого нужно бы устанавливать Boost из исходников.

Аватара пользователя
Olej
Писатель
Сообщения: 21336
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 12 дек 2014, 22:16

Olej писал(а): Go легко создаёт конкурирующих и 1000 и 5000 сопрограмм, ...
Т.е. сопрограммы Go - это совсем не нативные потоки ядра!
На то, что сопрограммы Go (goroutine) не есть pthread_t наталкивает ещё и то обстоятельство, что если убрать из кода задачи, показанного выше, в завершение каждого цикла инкремента счётчика:

Код: Выделить всё

           time.Sleep( 1 )            // 1 ns, но дать другим работать
Это минимальное wait-состояние (на 1 ns, но в это трудно поверить ;-) ), выполняющее фактически yeld(), переключение потока.
Но это не корпоративная многозадачность, потому что без этой строки:
- 1-й инкрементирующий поток сразу насколько сильно вытесняет даже запускающий с его sleep(), что он завершит задачу не за 1 сек., а за 6 или 16 сек. ...
- при этом результаты счёта по инкрементирующим потокам будут как-то так:

Код: Выделить всё

60000 5000 1000 600 0

Т.е. параллельные потоки всё-так вытесняются, но это вытесняющая многозадачность какая-то странная ... нужно разбираться.
Очень активные и прожорливые сопрограммы. ;-)

И ещё одна их особенность, что, как я понимаю, сопрограммы не имеют такого понятия как приоритет - они принципиально симметричные.
В теоретических статьях по параллелизму высказывались такие мнения, что параллелизм может быть надёжен и предсказуем только в симметричных условиях, при отсутствии приоритетов. Но это - только одна точка зрения из возможных.

Аватара пользователя
Olej
Писатель
Сообщения: 21336
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 12 дек 2014, 22:26

Olej писал(а): И ещё одна их особенность, что, как я понимаю, сопрограммы не имеют такого понятия как приоритет - они принципиально симметричные.
В теоретических статьях по параллелизму высказывались такие мнения, что параллелизм может быть надёжен и предсказуем только в симметричных условиях, при отсутствии приоритетов. Но это - только одна точка зрения из возможных.
В отношении приоритетов одна интересная мелочь наблюдается и в Java:
- указывается, что приоритет потока (может быть изменён методом setPriority()) находится от MIN_PRIORITY=1 до MAX_PRIORITY=10...
- дефаултное значение NORM_PRIORITY=5

Понятно, что хотя потоки Java в Linux JVM - это pthread_t, но эти приоритеты никакого отношения к реальным приоритетам Linux не имеют (к их диапазону). Это какой-то внутренний счёт JVM...
Более того, не совсем понятно вообще, как эти setPriority() начнут себя вести в Linux с его ... очень специфичной дисциплиной шедулирования потоков ядром ... которую то и приоритетной можно считать с большой натяжкой.
Хотя и в Windows ситуация будет не лучше ... а то и хуже. ;-)
По настоящему детерминированным поведением приоритетных потоков можно было бы считать только в realtime OS, например QNX... но тот доживает свои последние дни ;-) ... правда эти дни он доживает уже лет 7. :lol:

Аватара пользователя
Olej
Писатель
Сообщения: 21336
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 13 дек 2014, 22:23

Olej писал(а): Хотя относительно Python - это известно и понятно, что происходит ... и непонятно, почему для других интерпретирующих систем это могло бы быть иначе...
А вот и Python ;-)
То, что у них называется низкоуровневой моделью потоков, в Python 2 - это модуль _thread, в Python 3 - это модуль thread.

Код: Выделить всё

#!/usr/bin/python -O
# -*- coding: utf-8 -*-
import sys
import time
try:
    import thread as thred
except ImportError:
    import _thread as thred
from multiprocessing import cpu_count
import math
# низкоуровневая _thread модель:

def RunCounter( arr, num ) :
    global wait, active, finish
    lock = thred.allocate_lock()         # блокировка доступа к числу активных потоков
    lock.acquire(); active += 1; lock.release()
    counter = 0
    while not finish : counter += 1
    lock.acquire(); active -= 1; lock.release()
    arr[ num ] = counter
    ret = counter
    if 0 == active : wait.release()      # если это последний поток...
    return

nthr = 2                                 # число потоков
sec = 1                                  # интервал выполнения
debug = False
wait = thred.allocate_lock()             # блокировка ожидания завершения всех потоков
active = 0                               # число активно выполняющихся потоков
finish = False
try :
    if len( sys.argv ) > 1 :
        nthr = int( sys.argv[ 1 ] )
        if len( sys.argv ) > 2 :
            sec = int( sys.argv[ 2 ] )
            if sec < 0 :                 # признак отладочной диагностики
                sec = -sec
                debug = True
except ValueError :
    print( 'запуск: python[3] {} [потоков] [[-]секунд]'.format( sys.argv[ 0 ] ) )
    sys.exit( 1 )
if( debug ) :
    print( 'число потоков {}'.format( nthr ) )
    print( 'число доступных процессоров {}'.format( cpu_count() ) )
res = [ 0 for x in range( nthr ) ]       # результирующие счётчики
wait.acquire()                           # предварительный захват блокировки
for i in range( nthr ) :                 # запуск всех потоков
    thred.start_new_thread( RunCounter, ( res, i ) )
time.sleep( sec )
finish = True
wait.acquire()                           # ожидание завершения всех потоков
if debug :
    msg = ''
    for i in range( nthr ) :
        msg += '{} '.format( res[ i ] )
    print( '{}'.format( msg ) )
sum1, sum2, sq, md = 0.0, 0.0, 0.0, 0.0  # сбор статистики
for i in range( nthr ) :
    md = float( res[ i ] )
    sum1 += md
    sum2 += md * md;
md = sum1 / nthr;                        # среднее
sq = math.sqrt( sum2 / nthr - md * md ); # СКО
print( 'операций {:.0f} : {} * {:.0f} [+/-{:.2f}%]'
       .format( sum1 , nthr, md, 100. * sq / md ) )
sys.exit( 0 )
Здесь потоки сильно синхронизированы, чего я тщательно избегал в C & C++, поэтому на абсолютные цифры не смотрим:

Код: Выделить всё

[Olej@modules SpeedThread.5]$ python --version
Python 2.7.5

Код: Выделить всё

[Olej@modules SpeedThread.6]$ ./PylSpeed.py 1 3
операций 62239077 : 1 * 62239077 [+/-0.00%]
[Olej@modules SpeedThread.6]$ ./PylSpeed.py 2 3
операций 36115823 : 2 * 18057912 [+/-3.14%]
[Olej@modules SpeedThread.6]$ ./PylSpeed.py 3 3
операций 31657608 : 3 * 10552536 [+/-4.80%]
[Olej@modules SpeedThread.6]$ ./PylSpeed.py 4 3
операций 33825397 : 4 * 8456349 [+/-13.98%]
[Olej@modules SpeedThread.6]$ ./PylSpeed.py 5 3
операций 32277395 : 5 * 6455479 [+/-12.13%]
[Olej@modules SpeedThread.6]$ ./PylSpeed.py 6 3
операций 33007886 : 6 * 5501314 [+/-6.39%]
То, о чём и говорилось: 2 потока (по сравнению с простым последовательным выполнением) не увеличивает производительность, а уменьшает её в 2 раза!
Это плата за глобальную блокировку (Global Interpreter Lock — GIL) в интерпретаторе Python. И потоки здесь переключает не Linux, а интерпретатор Python.

В Python 3 обещали улучшить и избавиться от GIL:

Код: Выделить всё

[Olej@modules SpeedThread.5]$ python3 --version
Python 3.3.2

Код: Выделить всё

[Olej@modules SpeedThread.6]$ python3 ./PylSpeed.py 1 3
операций 40164416 : 1 * 40164416 [+/-0.00%]
[Olej@modules SpeedThread.6]$ python3 ./PylSpeed.py 2 3
операций 38617194 : 2 * 19308597 [+/-0.37%]
[Olej@modules SpeedThread.6]$ python3 ./PylSpeed.py 3 3
операций 42167265 : 3 * 14055755 [+/-30.91%]
[Olej@modules SpeedThread.6]$ python3 ./PylSpeed.py 4 3
операций 37030254 : 4 * 9257564 [+/-35.74%]
[Olej@modules SpeedThread.6]$ python3 ./PylSpeed.py 5 3
операций 40484406 : 5 * 8096881 [+/-32.90%]
Но исправили только в том смысле, что при наличии 4-х процессоров производительность многопоточной задачи в Python 3 не зависит от числа потоков. Т.е. масштабирования на N процессоров - никакого!
Вложения
PylSpeed.py
(2.69 КБ) 378 скачиваний

Аватара пользователя
Olej
Писатель
Сообщения: 21336
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 14 дек 2014, 02:06

Olej писал(а): А вот и Python ;-)
То, что у них называется низкоуровневой моделью потоков, в Python 2 - это модуль _thread, в Python 3 - это модуль thread.
И ещё вариант на Python.
Только теперь используя высокоуровневый модуль threading, то, что более привычная техника, и что на Python обычно и понимают как многопоточность.
Теперь тот же код компактнее и понятнее:

Код: Выделить всё

#!/usr/bin/python -O
# -*- coding: utf-8 -*-
import sys
import threading
import time
from multiprocessing import cpu_count
import math
# высокоуровневая threading модель:

def RunCounter( arr, num ) :
    global finish
    counter = 0
    while not finish : counter += 1
    arr[ num ] = counter
    return

nthr = 2                                 # число потоков
sec = 1                                  # интервал выполнения
debug = False
finish = False
try :
    if len( sys.argv ) > 1 :
        nthr = int( sys.argv[ 1 ] )
        if len( sys.argv ) > 2 :
            sec = int( sys.argv[ 2 ] )
            if sec < 0 :                 # признак отладочной диагностики
                sec = -sec
                debug = True
except ValueError :
    print( 'запуск: python[3] {} [потоков] [[-]секунд]'.format( sys.argv[ 0 ] ) )
    sys.exit( 1 )
if( debug ) :
    print( 'число потоков {}'.format( nthr ) )
    print( 'число доступных процессоров {}'.format( cpu_count() ) )
res = [ 0 for x in range( nthr ) ]       # результирующие счётчики
threads = []
for i in range( nthr ) :                 # создание и запуск потоков
    t = threading.Thread( target=RunCounter, args=( res, i ) )
    threads.append( t )
    t.setDaemon( True )
    t.start()
time.sleep( sec )
finish = True
for i in range( nthr ) :                  # ожидание завершения всех потоков
    threads[ i ].join()
if debug :
    msg = ''
    for i in range( nthr ) :
        msg += '{} '.format( res[ i ] )
    print( '{}'.format( msg ) )
sum1, sum2, sq, md = 0.0, 0.0, 0.0, 0.0  # сбор статистики
for i in range( nthr ) :
    md = float( res[ i ] )
    sum1 += md
    sum2 += md * md;
md = sum1 / nthr;                        # среднее
sq = math.sqrt( sum2 / nthr - md * md ); # СКО
print( 'операций {:.0f} : {} * {:.0f} [+/-{:.2f}%]'
       .format( sum1 , nthr, md, 100. * sq / md ) )
sys.exit( 0 )
Только от этого поведение его не меняется ;-) :

Код: Выделить всё

[Olej@modules SpeedThread.6]$ python ./PyhSpeed.py 1 3
операций 62145962 : 1 * 62145962 [+/-0.00%]
[Olej@modules SpeedThread.6]$ python ./PyhSpeed.py 2 3
операций 37828287 : 2 * 18914144 [+/-26.83%]
[Olej@modules SpeedThread.6]$ python ./PyhSpeed.py 3 3
операций 31786831 : 3 * 10595610 [+/-10.86%]
[Olej@modules SpeedThread.6]$ python ./PyhSpeed.py 4 3
операций 34229307 : 4 * 8557327 [+/-5.42%]
[Olej@modules SpeedThread.6]$ python ./PyhSpeed.py 5 3
операций 32266502 : 5 * 6453300 [+/-5.64%]

Код: Выделить всё

[Olej@modules SpeedThread.6]$ python3 ./PyhSpeed.py 1 3
операций 40678130 : 1 * 40678130 [+/-0.00%]
[Olej@modules SpeedThread.6]$ python3 ./PyhSpeed.py 2 3
операций 46321313 : 2 * 23160656 [+/-0.76%]
[Olej@modules SpeedThread.6]$ python3 ./PyhSpeed.py 3 3
операций 41854571 : 3 * 13951524 [+/-22.93%]
[Olej@modules SpeedThread.6]$ python3 ./PyhSpeed.py 4 3
операций 34591984 : 4 * 8647996 [+/-23.28%]
Вложения
PyhSpeed.py
(2.06 КБ) 359 скачиваний

Poropunenko

Re: #subject

Непрочитанное сообщение Poropunenko » 14 дек 2014, 11:51

“Дорогу одолеет идущий”. Желаю вам ни когда не останавливаться и быть творческой личностью – вечно!

Аватара пользователя
Olej
Писатель
Сообщения: 21336
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: #subject

Непрочитанное сообщение Olej » 14 дек 2014, 13:05

Poropunenko писал(а):“Дорогу одолеет идущий”. Желаю вам ни когда не останавливаться и быть творческой личностью – вечно!
А я вам желаю научиться в новом году писать никогда слитно.
Букварь вам в помощь! :lol:

Аватара пользователя
Olej
Писатель
Сообщения: 21336
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 14 дек 2014, 15:29

Olej писал(а): И ещё вариант на Python.
А теперь об масштабировании на уровне параллельных процессов.
Делать это на C, C++ или Java - довольно бессмысленно, а главное - утомительно: передавать численные данные после вычислений в родительский процесс для статистики - это нужно делать через IPC, и не так просто...
Другое дело - Python ;-) : здесь и а). потоки в смысле масштабирования бессмысленны, и б). очень просто организовать IPC готовыми пакетами Python:

Код: Выделить всё

#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import time
import sys
import signal
from multiprocessing import cpu_count, Value, Array
import math

def handler( signum, frame ):               # обработчик сигнала SIGUSR1
    global finish
    finish = True

nthr = 2                                    # число параллельых процессов
sec = 1                                     # интервал выполнения
debug = False
finish = False                              # флаг завергшения
try :
    if len( sys.argv ) > 1 :
        nthr = int( sys.argv[ 1 ] )
        if len( sys.argv ) > 2 :
            sec = int( sys.argv[ 2 ] )
            if sec < 0 :                    # признак отладочной диагностики
                sec = -sec
                debug = True
except ValueError :
    print( 'запуск: python[3] {} [потоков] [[-]секунд]'.format( sys.argv[ 0 ] ) )
    sys.exit( 1 )
if( debug ) :
    print( 'число потоков {}'.format( nthr ) )
    print( 'число доступных процессоров {}'.format( cpu_count() ) )
counter = 0
childs = [] 
res = Array( 'i', range( nthr ) )           # массив результатов в shared memory
for i in range( nthr ) :                    # создание и запуск исполняющих процессов
    try :
        pid = os.fork();                    # запуск новых процессов 
    except :
        print( 'error: create child process' ); sys.exit( 33 )
    if pid == 0 :                           # порождённый процесс
        signal.signal( signal.SIGUSR1, handler )
        while not finish : counter += 1 
        res[ i ] = counter
        sys.exit( 0 )
    if pid > 0 :                            # родительский процесс
        childs.append( pid )
time.sleep( sec )
for pid in childs :  os.kill( pid, signal.SIGUSR1 )
for pid in childs : pid, status = os.wait() # ожидание завершения всех процессов
if debug :
    msg = ''
    for i in range( len( res ) ) : msg += '{} '.format( res[ i ] )
    print( '{}'.format( msg ) )
sum1, sum2, sq, md = 0.0, 0.0, 0.0, 0.0     # сбор статистики
for i in range( len( res ) ) :
    md = float( res[ i ] )
    sum1 += md
    sum2 += md * md;
md = sum1 / nthr;                           # среднее
sq = math.sqrt( sum2 / nthr - md * md );    # СКО
print( 'операций {:.0f} : {} * {:.0f} [+/-{:.2f}%]'
       .format( sum1 , nthr, md, 100. * sq / md ) )
sys.exit( 0 )
Это сделано, пока, специально на fork(), так что под Windows такое не покатит ;-)
Родительский процесс синхронизирует порождённые по времени ("пора заканчивать") сигналом UNIX SIGUSR1, а данные обратно возвращаются через массив в shared memory.

Код: Выделить всё

[Olej@modules SpeedThread.6]$ ./PyfSpeed.py 1 3
операций 38981259 : 1 * 38981259 [+/-0.00%]
[Olej@modules SpeedThread.6]$ ./PyfSpeed.py 2 3
операций 72247766 : 2 * 36123883 [+/-0.03%]
[Olej@modules SpeedThread.6]$ ./PyfSpeed.py 3 3
операций 83464338 : 3 * 27821446 [+/-15.03%]
[Olej@modules SpeedThread.6]$ ./PyfSpeed.py 3 3
операций 83756178 : 3 * 27918726 [+/-17.73%]
[Olej@modules SpeedThread.6]$ ./PyfSpeed.py 4 3
операций 92895407 : 4 * 23223852 [+/-1.04%]
[Olej@modules SpeedThread.6]$ ./PyfSpeed.py 4 3
операций 91610136 : 4 * 22902534 [+/-0.79%]
[Olej@modules SpeedThread.6]$ ./PyfSpeed.py 5 3
операций 93099973 : 5 * 18619995 [+/-2.60%]
[Olej@modules SpeedThread.6]$ ./PyfSpeed.py 10 3
операций 92959619 : 10 * 9295962 [+/-5.71%]
[Olej@modules SpeedThread.6]$ ./PyfSpeed.py 100 3
операций 104568686 : 100 * 1045687 [+/-8.13%]
Обратим внимание на то, как в Python 2-3 параллельных процесса масштабируются лучше, чем 2-3 потока, даже в абсолютных числах производительности!

Код: Выделить всё

[Olej@modules SpeedThread.6]$ ./PylSpeed.py 2 3
операций 38733689 : 2 * 19366844 [+/-41.26%]
[Olej@modules SpeedThread.6]$ ./PylSpeed.py 3 3
операций 31688966 : 3 * 10562989 [+/-16.74%]
Вложения
PyfSpeed.py
(2.58 КБ) 367 скачиваний

Ответить

Вернуться в «Программирование»

Кто сейчас на конференции

Сейчас этот форум просматривают: нет зарегистрированных пользователей и 7 гостей