параллельность + синхронизации (примеры)

Вопросы написания собственного программного кода (на любых языках)

Модератор: Olej

Аватара пользователя
Olej
Писатель
Сообщения: 21338
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 10 дек 2014, 16:12

Olej писал(а): 2. Иметь в цикле инкремента потока логическую переменную "пора завершаться" ;-) :

Код: Выделить всё

while( finish ) count++;
Вот отсюда и сложность, которая обнаружилась выше... Но она решаемая, а эффекты - очень интересные ... в разных языках программирования. ;-)
Запускающий поток будет выполнять sleep() нужный интервал, после чего менять условие finish - потоки завершаться сами.
Вот эта модель №2, язык C, GCC:

Код: Выделить всё

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>
#include <pthread.h>

int volatile finish = 1;                // флаг завершения потока

void *RunCounter( void *res ) {         // потоковая функция
   long count = 0;
   while( finish ) count++;
   *(long*)res = count;
   return NULL;
}

int main( int argc, char* argv[] ) {
   int nthr = 2,                        // число потоков
       sec = 1,                         // интервал выполнения
       debug = 0, i;
   if( argc > 1 && atoi( argv[ 1 ] ) != 0 )
      nthr = atoi( argv[ 1 ] );
   if( argc > 2 && atoi( argv[ 2 ] ) != 0 ) {
      sec = atoi( argv[ 2 ] );
      if( sec < 0 ) {                   // признак отладочной диагностики
         sec = -sec; debug = 1;
      }
   }
   if( debug ) printf( "число потоков %d\n", nthr );
   pthread_t *tid = (pthread_t*)calloc( nthr, sizeof( pthread_t ) );
   long *res = (long*)calloc( nthr, sizeof( long ) );
   for( i = 0; i < nthr; i++ ) 
      if( pthread_create( &tid[ i ], NULL, RunCounter, (void*)( res + i ) ) != 0 )
         printf( "ошибка потока %d: %m", i ), exit( EXIT_FAILURE );
   sleep( sec );
   finish = 0;
   for( i = 0; i < nthr; i++ ) pthread_join( tid[ i ], NULL );
   free( tid );
   if( debug )
      for( i = 0; i < nthr; i++ ) 
         printf( "%lu%s", res[ i ], ( i == nthr - 1 ? "\n" : " " ) );
   double sum1 = 0, sum2 = 0, sq, md;
   for( i = 0; i < nthr; i++ ) {
      md = (double)res[ i ];
      sum1 += md;
      sum2 += md * md;
   }
   md = sum1 / nthr;                    // среднее
   sq = sqrt( sum2 / nthr - md * md );  // СКО
   printf( "операций %.0f : %d * %.0f [+/-%d%%]\n",
           sum1, nthr, md, (int)( 100. * sq / md ) ); 
   free( res );
   return( EXIT_SUCCESS );
}

Код: Выделить всё

[Olej@modules SpeedThread.3]$ time ./CSpeed1 3 2
операций 2075162530 : 3 * 691720843 [+/-14%]

real	0m2.002s
user	0m5.877s
sys	0m0.004s
2 секунды + 3 потока ... и вон сколько работы наделали ;-) - 2075162530

И то же самое, но с более детальным, отладочным выводом (отдельно сколько каждый поток наработал):

Код: Выделить всё

[Olej@modules SpeedThread.3]$ time ./CSpeed1 3 -3
число потоков 3
969958521 878056486 1231518592
операций 3079533599 : 3 * 1026511200 [+/-14%]

real	0m3.002s
user	0m8.786s
sys	0m0.027s
Вложения
CSpeed.c
(1.85 КБ) 286 скачиваний

Аватара пользователя
Olej
Писатель
Сообщения: 21338
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 11 дек 2014, 14:30

Olej писал(а): 3. После sleep() запускающего потока, выполнять в нём pthread_cancel() для каждого считающего потока.
Очень чистая идея ... но осложнённая тем, что разные реализации модели потоков придумывают (C++11, Boost) свои обёртки для pthread_t, в которых а). такие "тонкие" ;-) штучки как pthread_cancel() они забывают сделать, а б). к самому pthread_t их модели-обёртки не так просто добраться.
Напомню, что pthread_cancel() не убивает поток, а говорит, что тому "пора завершаться" ... когда он дойдёт до ближайшей точки завершаемости, поэтому сам поток перед циклическим инкрементом должен сделать что-то типа:

Код: Выделить всё

   pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
   pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL );
Далее...
Модель №3, язык C, GCC:

Код: Выделить всё

#define _GNU_SOURCE                     // только 1-й строкой
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>
#include <pthread.h>

void *RunCounter( void *res ) {         // потоковая функция
   pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
   pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL );
   volatile long *p = (long*)res;
   while( 1 ) *p += 1; 
   return NULL;
}

int main( int argc, char* argv[] ) {
   int nthr = 2,                        // число потоков
       sec = 1,                         // интервал выполнения
       debug = 0, i;
   if( argc > 1 ) {
      if( atoi( argv[ 1 ] ) != 0 )
         nthr = atoi( argv[ 1 ] );
      else
         printf( "запуск: %s [потоков] [[-]секунд]\n", argv[ 0 ] ),
         exit( EXIT_SUCCESS );
      if( argc > 2 && atoi( argv[ 2 ] ) != 0 ) {
         sec = atoi( argv[ 2 ] );
         if( sec < 0 ) {                // признак отладочной диагностики
            sec = -sec; debug = 1;
         }
      }
   }
   if( debug ) {
      printf( "число потоков %d\n", nthr );
      cpu_set_t mask;
      sched_getaffinity( 0, sizeof( cpu_set_t ), &mask );
      printf( "процессоров в системе %d\n", CPU_COUNT( &mask ) );
   }
   pthread_t *tid = (pthread_t*)calloc( nthr, sizeof( pthread_t ) );
   long *res = (long*)calloc( nthr, sizeof( long ) );
   for( i = 0; i < nthr; i++ ) 
      if( pthread_create( &tid[ i ], NULL, RunCounter, (void*)( res + i ) ) != 0 )
         printf( "ошибка потока %d: %m", i ), exit( EXIT_FAILURE );
   sleep( sec );
   for( i = 0; i < nthr; i++ ) pthread_cancel( tid[ i ] );
   for( i = 0; i < nthr; i++ ) pthread_join( tid[ i ], NULL );
   free( tid );
   if( debug )
      for( i = 0; i < nthr; i++ ) 
         printf( "%lu%s", res[ i ], ( i == nthr - 1 ? "\n" : " " ) );
   double sum1 = 0, sum2 = 0, sq, md;
   for( i = 0; i < nthr; i++ ) {
      md = (double)res[ i ];
      sum1 += md;
      sum2 += md * md;
   }
   md = sum1 / nthr;                    // среднее
   sq = sqrt( sum2 / nthr - md * md );  // СКО
   printf( "операций %.0f : %d * %.0f [+/-%.2f%%]\n",
           sum1, nthr, md, 100. * sq / md );
   free( res );
   return( EXIT_SUCCESS );
}

Код: Выделить всё

[Olej@modules SpeedThread]$ time ./CSpeed1 3 -2
число потоков 3
процессоров в системе 4
650692431 697970366 787583924
операций 2136246721 : 3 * 712082240 [+/-7.97%]

real	0m2.002s
user	0m5.928s
sys	0m0.003s
CSpeed1.c
(2.31 КБ) 304 скачивания
P.S. И чуть обновлённая версия предыдущего варианта (только форматирование результата - для единообразия и сравнения):
CSpeed.c
(2.27 КБ) 282 скачивания

Аватара пользователя
Olej
Писатель
Сообщения: 21338
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 11 дек 2014, 14:53

Olej писал(а): Далее...
Модель №3, язык C, GCC:
Модель №3, GCC, язык C++ в варианте C++11 ... вряд ли C++ хоть что-то в реализации и поведении потоков может добавить относительно C, интересно посмотреть на нововведение (C++11) класс thread:: (<thrread>) :

Код: Выделить всё

#include <unistd.h>
#include <math.h>
#include <iostream> 
#include <iomanip> 
#include <thread>        
#include <string>        
#include <sstream>       

using namespace std;

void thrfunc( volatile long *res ) {         // потоковая функция
   pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
   pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL );
   while( true ) ++*res;
}

class RunCounter : public thread {
private:
   long volatile count;
public:
   RunCounter() : thread( thrfunc, &count ), count( 0 ) {}
   void cancel() {
      ostringstream msg; 
      msg << get_id();
      pthread_cancel( stoul( msg.str() ) );
   }
   long getCount() { return count; }
};

int main( int argc, char* argv[] ) {
   int nthr = 2,                            // число потоков
       sec = 1;                             // интервал выполнения
   bool debug = false;
   if( argc > 1 ) {
      if( atoi( argv[ 1 ] ) != 0 )
         nthr = atoi( argv[ 1 ] );
      else
         cout << "запуск: " << argv[ 0 ] << " [потоков] [[-]секунд]" << endl,
         exit( EXIT_SUCCESS );
      if( argc > 2 && atoi( argv[ 2 ] ) != 0 ) {
         sec = atoi( argv[ 2 ] );
         if( sec < 0 ) {                // признак отладочной диагностики
            sec = -sec; debug = 1;
         }
      }
   }
   if( debug ) {
      printf( "число потоков %d\n", nthr );
      cpu_set_t mask;
      sched_getaffinity( 0, sizeof( cpu_set_t ), &mask );
      printf( "процессоров в системе %d\n", CPU_COUNT( &mask ) );
   }
   RunCounter *ar = new RunCounter[ nthr ]; // стартовали потоки
   sleep( sec );
   for( int i = 0; i < nthr; i++ ) ar[ i ].cancel(); 
   for( int i = 0; i < nthr; i++ ) ar[ i ].join(); 
   if( debug )
      for( int i = 0; i < nthr; i++ ) 
         cout << ar[ i ].getCount() << ( i == nthr - 1 ? "\n" : " " );
   double sum1 = 0, sum2 = 0, sq, md;
   for( int i = 0; i < nthr; i++ ) {
      md = (double)ar[ i ].getCount();
      sum1 += md;
      sum2 += md * md;
   }
   md = sum1 / nthr;                        // среднее
   sq = sqrt( sum2 / nthr - md * md );      // СКО
   cout << "операций " << (long)sum1 << " : " << nthr << " * "
        << (long)md << " [+/-" << setprecision( 3 )
        << 100. * sq / md << "%]" <<endl; 
   delete [] ar;
   return( EXIT_SUCCESS );
}
Вообще то удивляет (потрясает) как в языковых расширениях (C++, Boost, ...), делая обёртки для POSIX API :
- из нескольких десятков API pthread* - оставляется несколько единичных методов класса (остальные не нужны? ... с жиру бесимся? ;-) )
- а добраться до оригинального pthread_t для использования оригинальных API - дело ах как не простое :-o : из собственный метод thread::get_id() возвращает результат из собственного типа, который имеет ... только единственную операцию >> запись в поток вывода ... который и есть отображением pthread_t :cry:
- посмотрите как приходится вызывать элементарную отмену потока pthread_cancel()...

Но тем не менее:

Код: Выделить всё

[Olej@modules SpeedThread.4]$ time ./CppSpeed 3 -2
число потоков 3
процессоров в системе 4
733200968 685764992 842066152
операций 2261032112 : 3 * 753677370 [+/-8.68%]

real	0m2.009s
user	0m5.868s
sys	0m0.003s
Что интересно?

Код: Выделить всё

[Olej@modules SpeedThread.4]$ taskset -c 0-2 ./CppSpeed 3 -2
число потоков 3
процессоров в системе 3
753568774 776271481 826003631
операций 2355843886 : 3 * 785281295 [+/-3.85%]
Что переопределив афинити-маску процесса командой taskset, задача видит доступное в системе число процессоров именно как это переопределённое значение.
Вложения
CppSpeed.cc
(2.4 КБ) 318 скачиваний

Аватара пользователя
Olej
Писатель
Сообщения: 21338
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 11 дек 2014, 14:59

Olej писал(а):Модель №3, GCC, язык C++ в варианте C++11 ...
Интересно (будет) посмотреть что там в Boost реализации класса boost::thread ... но что-то (поверхностное изучение boost::thread ;-) ) мне подсказывает, что никаких абсолютно отличий там нет.
Судя по хронологии, реализацию C++11 просто списали один в один с реализации Boost (как, впрочем, было и с STL до его включения в C++ стандарт).

Аватара пользователя
Olej
Писатель
Сообщения: 21338
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 11 дек 2014, 15:27

Olej писал(а):Модель №3, GCC, язык C++ в варианте C++11 ...
МоделИ №2, язык Java...
Множественное число только потому, что хотелось посмотреть (сравнить) реализацию потоковой функции с наследованием класса Thread и с реализацией интерфейса Runnable ... хотя никаких там отличий не должно быть.

Код: Выделить всё

import java.lang.Thread;

class RunCounter implements Runnable {
   private static boolean finish = false;       // флаг завершения
   synchronized static void setFinal() { finish = true; };
   synchronized static boolean getFinal() { return finish; } 
   private long count;
   RunCounter() { count = 0;  }
   public void run() {
     while( !getFinal() ) count++;
   }
   long getCount() { return count; }
}

public class RunnableSpeed {
   public static void main( String[] args ) {
      int nthr = 2,                             // число потоков
          sec = 1;                              // интервал выполнения
      boolean debug = false;
      try {
         if( args.length > 0 ) 
            nthr = new Integer( args[ 0 ] ).intValue();
         if( args.length > 1 ) {
            sec = new Integer( args[ 1 ] ).intValue();
            if( debug = sec < 0 )               // признак отладочной диагностики
               sec = -sec;
         }
      }
      catch( java.lang.NumberFormatException ex ) {
         System.out.println( "запуск: java RunnableSpeed [потоков] [[-]секунд]" );
         System.exit( 1 );
      }
      if( debug ) System.out.println( "число потоков " + nthr );
      RunCounter ar[] = new RunCounter[ nthr ];
      for( int i = 0; i < nthr; i++ ) ar[ i ] = new RunCounter();
      Thread tr[] = new Thread[ nthr ];
      for( int i = 0; i < nthr; i++ ) ( tr[ i ] = new Thread( ar[ i ] )).start();
      try { Thread.sleep( sec * 1000 ); }
      catch( InterruptedException ex ) {
         System.out.println( "ошибка ожидания " + " : " + ex.toString() );
      }
      RunCounter.setFinal();
      for( int i = 0; i < nthr; i++ )           // ожидаем завершения
         try {
            tr[ i ].join();
         }
         catch( InterruptedException ex ) {
            System.out.println( "ошибка потока " + i + " : " + ex.toString() );
         }
      if( debug ) 
         for( int i = 0; i < nthr; i++ )
            System.out.print( ar[ i ].getCount() + ( i == nthr - 1 ? "\n" : " " ) );
      double sum1 = 0, sum2 = 0, sq, md;        // сбор статистики
      for( int i = 0; i < nthr; i++ ) {
         md = ar[ i ].getCount();
         sum1 += md;
         sum2 += md * md;
      }
      md = sum1 / nthr;                         // среднее
      sq = Math.sqrt( sum2 / nthr - md * md );  // СКО
      sum2 = ( (int)( 100. * sq / md ) * 100 ) / 100.; 
      System.out.println( "операций " + (int)sum1 + " : " +
                          nthr + " * " + (int)md + " [+/-" +
                          sum2 + "%]" );
   }
}

Код: Выделить всё

[Olej@modules SpeedThread.4]$ time java RunnableSpeed 3 -2
число потоков 3
14015742 14033947 12567514
операций 40617203 : 3 * 13539067 [+/-5.0%]

real	0m2.081s
user	0m3.910s
sys	0m1.112s
[Olej@modules SpeedThread.4]$ java RunnableSpeed help
запуск: java RunnableSpeed [потоков] [[-]секунд]

Код: Выделить всё

import java.lang.Thread;

class RunCounter extends Thread {
   private static boolean finish = false;       // флаг завершения
   synchronized static void setFinal() { finish = true; };
   synchronized static boolean getFinal() { return finish; } 
   private long count;
   RunCounter() { count = 0;  }
   public void run() {
     while( !getFinal() ) count++;
   }
   long getCount() { return count; }
}

public class ThreadSpeed {
   public static void main( String[] args ) {
      int nthr = 2,                             // число потоков
          sec = 1;                              // интервал выполнения
      boolean debug = false;
      try {
         if( args.length > 0 )
            nthr = new Integer( args[ 0 ] ).intValue();
         if( args.length > 1 ) {
            sec = new Integer( args[ 1 ] ).intValue();
            if( debug = sec < 0 )               // признак отладочной диагностики
               sec = -sec;
         }
      }
      catch( java.lang.NumberFormatException ex ) {
         System.out.println( "запуск: java ThreadSpeed [потоков] [[-]секунд]" );
         System.exit( 1 );
      }
      if( debug ) System.out.println( "число потоков " + nthr );
      RunCounter ar[] = new RunCounter[ nthr ];
      for( int i = 0; i < nthr; i++ ) ar[ i ] = new RunCounter();
      for( int i = 0; i < nthr; i++ ) ar[ i ].start();
      try { Thread.sleep( sec * 1000 ); }
      catch( InterruptedException ex ) {
         System.out.println( "ошибка ожидания " + " : " + ex.toString() );
      }
      RunCounter.setFinal();
      for( int i = 0; i < nthr; i++ )           // ожидаем завершения
         try {
            ar[ i ].join();
         }
         catch( InterruptedException ex ) {
            System.out.println( "ошибка потока " + i + " : " + ex.toString() );
         }
      if( debug )
         for( int i = 0; i < nthr; i++ )
            System.out.print( ar[ i ].getCount() + ( i == nthr - 1 ? "\n" : " " ) );
      double sum1 = 0, sum2 = 0, sq, md;        // сбор статистики
      for( int i = 0; i < nthr; i++ ) {
         md = ar[ i ].getCount();
         sum1 += md;
         sum2 += md * md;
      }
      md = sum1 / nthr;                         // среднее
      sq = Math.sqrt( sum2 / nthr - md * md );  // СКО
      sum2 = ( (int)( 100. * sq / md ) * 100 ) / 100.;
      System.out.println( "операций " + (int)sum1 + " : " +
                          nthr + " * " + (int)md + " [+/-" +
                          sum2 + "%]" );
   }
}

Код: Выделить всё

[Olej@modules SpeedThread.4]$ time java ThreadSpeed 3 -2
число потоков 3
16343427 13708330 14732887
операций 44784644 : 3 * 14928214 [+/-7.0%]

real	0m2.086s
user	0m3.657s
sys	0m1.006s
Здесь ничего неожиданного ... кроме:

Код: Выделить всё

   synchronized static void setFinal() { finish = true; };
   synchronized static boolean getFinal() { return finish; } 
Без чего приложение разваливается.

P.S. бросается в глаза, что объём выполненной "работы" на 2 порядка меньше, чем в C аналоге ... хотя абсолютные цифры ни о чём не говорят и не есть предметом рассмотрения.
Вложения
RunnableSpeed.java
(2.3 КБ) 321 скачивание
ThreadSpeed.java
(2.2 КБ) 320 скачиваний

Аватара пользователя
Olej
Писатель
Сообщения: 21338
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 11 дек 2014, 17:46

Olej писал(а):Модель №3, GCC, язык C++ в варианте C++11 ...
Модель ... :-o ... 2+ на языке Go:

Код: Выделить всё

package main
import( "fmt"; "os"; "strconv"; "runtime";
        "time"; "sync"; "sync/atomic"; "math" )

func main() {
   секунды, потоки := 1, 2             // интервал выполнения и число потоков
   debug := false
   var err error 
   if len( os.Args ) > 1 {
      потоки, err = strconv.Atoi( os.Args[ 1 ] )
      if err != nil {
         fmt.Printf( "запуск: %s [потоков] [[-]секунд]\n", os.Args[ 0 ] )
         os.Exit( 1 )
      } 
      if len( os.Args ) > 2 {
         секунды, err = strconv.Atoi( os.Args[ 2 ] )
         if err != nil { 
            fmt.Printf( "запуск: %s [потоков] [[-]секунд]\n", os.Args[ 0 ] )
            os.Exit( 1 )
         } 
         if секунды < 0 {              // признак отладочной диагностики
            секунды = -секунды; debug = true
         }
      }
   }
   if( debug ) {
      fmt.Printf( "число потоков %v\n", потоки );
      fmt.Printf( "процессоров в системе %v\n", runtime.NumCPU() )
   }  
   if потоки > 1 { 
	  runtime.GOMAXPROCS( потоки + 1 )  // число разрешённых процессоров 
   }
   res := make( [] uint64, потоки )    // массив результатов
   var wg sync.WaitGroup               // WaitGroup - барьер завершения
   var finish uint32 = 0
   for i := range res {
      wg.Add( 1 )
      go func( j int ) {               // функция сопрограммы 
         var count uint64 = 0
         defer wg.Done()
         defer func() {                // функция завершения
            res[ j ] = count
         } ()
         for {
            if atomic.LoadUint32( &finish ) != 0 { 
			   break
			}
            count++
            time.Sleep( 1 )            // 1 ns, но дать другим работать
         }
      } ( i )
   }
   time.Sleep( time.Duration( секунды ) * time.Second ) // наносекунды
   atomic.StoreUint32( &finish, 1 )
   wg.Wait()                           // ожидание завершения сопрограмм
   if debug {
	  for i := range res {
         fmt.Printf( "%v ", res[ i ] ) 
      }
      fmt.Printf( "\n" )    
   }
   var sum1, sum2, sq, md float64 = 0, 0, 0, 0
   for i := range res {
      md = float64( res[ i ] )
      sum1 += md
      sum2 += md * md
   } 
   md = sum1 / float64( потоки )                         // среднее
   sq = math.Sqrt( sum2 / float64( потоки ) - md * md )  // СКО
   fmt.Printf( "операций %.0f : %d * %.0f [+/-%.2f%%]\n",
               sum1, потоки, md, 100. * sq / md )
}
Модель 2+ потому, что сигнал завершения посылается через примитив синхронизации WaitGroup ... что-то на манер pthrread_barier_t в POSIX.
Лаконизм и ясность здесь в Go, конечно, потрясающие! : просто запустить фрагмент кода (даже функции не нужно специальной) на параллельное выполнение в N ветках... а при завершении каждой ветки (не важно по какому механизму) - выполнить в каждом экземпляре по 2 функции из стека завершения.

Код: Выделить всё

[Olej@modules SpeedThread.4]$ make GoSpeed
gccgo -g -O3 GoSpeed.go -o GoSpeed
[Olej@modules SpeedThread.4]$ time ./GoSpeed 3 -2
число потоков 3
процессоров в системе 4
82222 82333 82736 
операций 247291 : 3 * 82430 [+/-0.27%]

real	0m2.031s
user	0m1.109s
sys	0m0.956s
Вложения
GoSpeed.go
(2.67 КБ) 290 скачиваний

Аватара пользователя
Olej
Писатель
Сообщения: 21338
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 11 дек 2014, 19:35

Попалась очень интересная статья Х обсуждение в тему: Потоки в C++ против потоков в Go.
Гоурутины это не потоки а скорее смесь фиберов с пулом потоков, причём глубоко интегрированная в язык. Понятное дело что нативную реализацию потоков она будет крыть как бык овцу. Причём чем меньше в системе процессоров тем круче будет крыть ;)
По умолчанию при запуске приложение стартует главный горутин. Вы можете создать еще горутины, чтобы выполнять некие действия паралельно, но! Все горутины выполняются в одном потоке! Память процесса делится между ними, но эти потоки "легкие" и их концепция больше всего напоминает микротреды.
И оттуда есть любопытные ссылки на подобные обсуждения и сравнения.

P.S. Написано это 4 года назад, когда возрасту реализациям (совершенствованиям) Go было ~3 года (от 2007г.), а C++ - ~30 лет (от 1983г.). ;-)

Аватара пользователя
Olej
Писатель
Сообщения: 21338
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 11 дек 2014, 19:49

Olej писал(а):Попалась очень интересная статья Х обсуждение в тему: Потоки в C++ против потоков в Go.
Гоурутины это не потоки а скорее смесь фиберов с пулом потоков, причём глубоко интегрированная в язык. Понятное дело что нативную реализацию потоков она будет крыть как бык овцу. Причём чем меньше в системе процессоров тем круче будет крыть ;)
И ещё в продолжение разговора: Треды и фиберы.
Это перевод публикации ... кого-то из вирусописателей ;-) , написано это аж в 1999г., на примерах Windows 3.51 и Windows 4...
Но это существенно дополняет картину конкурентности, параллельности и многопроцессорности.

Аватара пользователя
Olej
Писатель
Сообщения: 21338
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 11 дек 2014, 20:30

И вот тут начинается самое интересное ;-) :

Код: Выделить всё

[Olej@modules SpeedThread.4]$ ./CSpeed1 1 -3
число потоков 1
процессоров в системе 4
1341953458
операций 1341953458 : 1 * 1341953458 [+/-0.00%]
Как легко видеть, на компьютере, на котором я смотрю результаты (в этот раз) 4 процессора (ядра).

Код: Выделить всё

[Olej@modules SpeedThread.4]$ ./CSpeed1 1 3
операций 1388581369 : 1 * 1388581369 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./CSpeed1 2 3
операций 1757067621 : 2 * 878533810 [+/-0.20%]
[Olej@modules SpeedThread.4]$ ./CSpeed1 3 3
операций 3218782198 : 3 * 1072927399 [+/-8.70%]
[Olej@modules SpeedThread.4]$ ./CSpeed1 4 3
операций 2247505021 : 4 * 561876255 [+/-8.97%]
[Olej@modules SpeedThread.4]$ ./CSpeed1 5 3
операций 2227315346 : 5 * 445463069 [+/-8.01%]
[Olej@modules SpeedThread.4]$ ./CSpeed1 10 3
операций 3085455637 : 10 * 308545564 [+/-28.58%]
[Olej@modules SpeedThread.4]$ ./CSpeed1 20 3
операций 4148029004 : 20 * 207401450 [+/-15.20%]
[Olej@modules SpeedThread.4]$ ./CSpeed1 50 3
операций 5016018082 : 50 * 100320362 [+/-9.77%]
[Olej@modules SpeedThread.4]$ ./CSpeed1 100 3
операций 5285964350 : 100 * 52859644 [+/-4.00%]

Производительность растёт монотонно от 1 до 3 ( :-o ) потоков ... на 4-х - спад. Похоже, что пассивно ожидающий поток распределён на процессор, а 4-й считающий поток "совмещается" с кем-то на одном активном процессе (это можно подыграть с афинити-масками ... но не сейчас).
Небольшой рост (якобы) производительности при большом числе потоков - обман: просто потоки запускаются последовательно в цикле, а отсчёт паузы работы идёт после запуска последнего, а первые за это время успевают втихаря наработать ;-)

Теперь Java:

Код: Выделить всё

[Olej@modules SpeedThread.4]$ java RunnableSpeed 1 3
операций 125399127 : 1 * 125399127 [+/-0.0%]
[Olej@modules SpeedThread.4]$ java RunnableSpeed 2 3
операций 68637864 : 2 * 34318932 [+/-80.0%]
[Olej@modules SpeedThread.4]$ java RunnableSpeed 3 3
операций 60708172 : 3 * 20236057 [+/-9.0%]
[Olej@modules SpeedThread.4]$ java RunnableSpeed 4 3
операций 76943841 : 4 * 19235960 [+/-2.0%]
[Olej@modules SpeedThread.4]$ java RunnableSpeed 5 3
операций 100339425 : 5 * 20067885 [+/-7.0%]
[Olej@modules SpeedThread.4]$ java RunnableSpeed 10 3
операций 100326079 : 10 * 10032607 [+/-7.0%]
[Olej@modules SpeedThread.4]$ java RunnableSpeed 20 3
операций 98121271 : 20 * 4906063 [+/-11.0%]
[Olej@modules SpeedThread.4]$ java RunnableSpeed 50 3
операций 97520611 : 50 * 1950412 [+/-21.0%]
А вот тут - сюрприз!
Одиночный поток нарабатывает практически в 2 раза больше, чем 2, 3, 4, ...
Такое впечатление, что "якобы потоки" организуются внутри JVM, которая сама по себе выполняется на одиночном процессоре. И повысить производительность Java приложения за счёт многих процессоров - не получается.
(Это очень похоже на то, что происходит в Python)
Но JVM при этом создаёт именно нативные потоки (pthread_t), ядерные. Что легко проверить так:

Код: Выделить всё

[Olej@modules SpeedThread.4]$ java RunnableSpeed 1000 3
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:714)
    at RunnableSpeed.main(RunnableSpeed.java:38)
ABRT problem creation: 'success'

Убито

И вам придётся kill java со стороннего терминала.
Это то ограничение, которое установлено софтверно в системе на число потоков/процессов (а Linux, как помним, потоки и процессы не различает):

Код: Выделить всё

[Olej@modules SpeedThread.4]$ ulimit -S -u
1024
И эту границу можно изменить, в принципе, до величины:

Код: Выделить всё

[Olej@modules SpeedThread.4]$ ulimit -H -u
31384
Теперь Go:

Код: Выделить всё

[Olej@modules SpeedThread.4]$ ./GoSpeed 1 3
операций 388233 : 1 * 388233 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 2 3
операций 393233 : 2 * 196616 [+/-0.28%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 3 3
операций 357836 : 3 * 119279 [+/-0.35%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 4 3
операций 389318 : 4 * 97330 [+/-0.39%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 5 3
операций 456734 : 5 * 91347 [+/-0.22%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 10 3
операций 648778 : 10 * 64878 [+/-0.14%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 20 3
операций 999374 : 20 * 49969 [+/-0.18%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 50 3
операций 1562606 : 50 * 31252 [+/-0.10%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 100 3
операций 1796926 : 100 * 17969 [+/-0.16%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 500 3
операций 1708389 : 500 * 3417 [+/-0.29%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 1000 3
операций 1485738 : 1000 * 1486 [+/-0.60%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 5000 3
операций 1300043 : 5000 * 260 [+/-1.37%]
Go легко создаёт конкурирующих и 1000 и 5000 сопрограмм, которые, похоже, совместно разделяют максимально возможную для задачи производительность.
Опять же, сравним с исходным кодом C:

Код: Выделить всё

[Olej@modules SpeedThread.4]$ ./CSpeed1 1000 3
ошибка потока 242: Resource temporarily unavailable[Olej@modules SpeedThread.4]$
Т.е. сопрограммы Go - это совсем не нативные потоки ядра!

Аватара пользователя
Olej
Писатель
Сообщения: 21338
Зарегистрирован: 24 сен 2011, 14:22
Откуда: Харьков
Контактная информация:

Re: параллельность + синхронизации (примеры)

Непрочитанное сообщение Olej » 11 дек 2014, 20:43

Olej писал(а): Go легко создаёт конкурирующих и 1000 и 5000 сопрограмм, которые, похоже, совместно разделяют максимально возможную для задачи производительность.
С Go я ошибся!
Go замечательно справляется с загрузкой всех доступных процессоров.
Просто в коде примера некорректно изменялось число доступных к использованию процессоров (по дефаулту Go действительно использует только 1 процессор):

Код: Выделить всё

[Olej@modules SpeedThread.4]$ ./GoSpeed 1 3
операций 423494 : 1 * 423494 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 2 3
операций 832823 : 2 * 416412 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 3 3
операций 1266382 : 3 * 422127 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 4 3
операций 1728978 : 4 * 432244 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 5 3
операций 2130759 : 5 * 426152 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 6 3
операций 2596502 : 6 * 432750 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 10 3
операций 3943839 : 10 * 394384 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 20 3
операций 6030717 : 20 * 301536 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 50 3
операций 7776333 : 50 * 155527 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 100 3
операций 9901527 : 100 * 99015 [+/-0.00%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 1000 3
операций 6342230 : 1000 * 6342 [+/-0.01%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 5000 3
операций 5524867 : 5000 * 1105 [+/-6.15%]
[Olej@modules SpeedThread.4]$ ./GoSpeed 10000 3
операций 5056108 : 10000 * 506 [+/-14.81%]
Вложения
GoSpeed.go
(2.67 КБ) 290 скачиваний

Ответить

Вернуться в «Программирование»

Кто сейчас на конференции

Сейчас этот форум просматривают: нет зарегистрированных пользователей и 7 гостей