所需E币: 0
时间: 2023-12-22 10:13
大小: 3.93KB
Java并发编程从入门到进阶多场景实战,众所周知,并发编程是优秀工程师的标准之一,但知识庞杂,复杂性高,常常让人望而却步。但如果没有掌握背后的核心原理,你开发的代码可能会成为难以调试和优化的头疼问题。在此,我将通过上百个案例场景驱动教学+动画直观演示,帮助大家深入、直观地理解并发编程核心概念和底层原理。助力大家在实际工作和面试中都能尽早脱颖而出。首先,我们先来了解关于并发的基本概念。并发情况主要会引出三个基本概念,分别是原子性、可见性、有序性三个基本概念Java中线程的状态分为6种:1.初始(NEW):新创建了一个线程对象,但还没有调用start()方法。2.运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。3.阻塞(BLOCKED):表示线程阻塞于锁。4.等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。5.超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。6.终止(TERMINATED):表示该线程已经执行完毕。其实我们可以通过job.setPartitionerClass来设置分区类,不过目前我们是没有设置的,那框架中是不是有默认值啊,是有的,我们可以通过job.getPartitionerClass方法看到默认情况下会使用HashPartitioner这个分区类那我们来看一下HashPartitioner的实现是什么样子的/**Partitionkeysbytheir{@linkObject#hashCode()}.*/@InterfaceAudience.Public@InterfaceStability.StablepublicclassHashPartitioner<K,V>extendsPartitioner<K,V>{ /**Use{@linkObject#hashCode()}topartition.*/ publicintgetPartition(Kkey,Vvalue, intnumReduceTasks){ return(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks; }}下面我们来具体跑一个这份数据,首先复制一份WordCountJob的代码,新的类名为WordCountJobSkewpackagecom.imooc.mr;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.io.IOException;/** *数据倾斜-增加Reduce任务个数 * *Createdbyxuwei */publicclassWordCountJobSkew{ /** *Map阶段 */ publicstaticclassMyMapperextendsMapper<LongWritable,Text,Text,LongWritable>{ Loggerlogger=LoggerFactory.getLogger(MyMapper.class); /** *需要实现map函数 *这个map函数就是可以接收<k1,v1>,产生<k2,v2> *@paramk1 *@paramv1 *@paramcontext *@throwsIOException *@throwsInterruptedException */ @Override protectedvoidmap(LongWritablek1,Textv1,Contextcontext) throwsIOException,InterruptedException{ //输出k1,v1的值 //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //k1代表的是每一行数据的行首偏移量,v1代表的是每一行内容 //对获取到的每一行数据进行切割,把单词切割出来 String[]words=v1.toString().split(""); //把单词封装成<k2,v2>的形式 Textk2=newText(words[0]); LongWritablev2=newLongWritable(1L); //把<k2,v2>写出去 context.write(k2,v2); } } /** *Reduce阶段 */ publicstaticclassMyReducerextendsReducer<Text,LongWritable,Text,LongWritable>{ Loggerlogger=LoggerFactory.getLogger(MyReducer.class); /** *针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去 *@paramk2 *@paramv2s *@paramcontext *@throwsIOException *@throwsInterruptedException */ @Override protectedvoidreduce(Textk2,Iterable<LongWritable>v2s,Contextcontext) throwsIOException,InterruptedException{ //创建一个sum变量,保存v2s的和 longsum=0L; //对v2s中的数据进行累加求和 for(LongWritablev2:v2s){ //输出k2,v2的值 //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); sum+=v2.get();//模拟Reduce的复杂计算消耗的时间 if(sum%200==0){ Thread.sleep(1); } } //组装k3,v3 Textk3=k2; LongWritablev3=newLongWritable(sum); //输出k3,v3的值 //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); //把结果写出去 context.write(k3,v3); } } /** *组装Job=Map+Reduce */ publicstaticvoidmain(String[]args){ try{ if(args.length!=3){ //如果传递的参数不够,程序直接退出 System.exit(100); } //指定Job需要的配置参数 Configurationconf=newConfiguration(); //创建一个Job Jobjob=Job.getInstance(conf); //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的 job.setJarByClass(WordCountJobSkew.class); //指定输入路径(可以是文件,也可以是目录) FileInputFormat.setInputPaths(job,newPath(args[0])); //指定输出路径(只能指定一个不存在的目录) FileOutputFormat.setOutputPath(job,newPath(args[1])); //指定map相关的代码 job.setMapperClass(MyMapper.class); //指定k2的类型 job.setMapOutputKeyClass(Text.class); //指定v2的类型 job.setMapOutputValueClass(LongWritable.class); //指定reduce相关的代码 job.setReducerClass(MyReducer.class); //指定k3的类型 job.setOutputKeyClass(Text.class); //指定v3的类型 job.setOutputValueClass(LongWritable.class); //设置reduce任务个数 job.setNumReduceTasks(Integer.parseInt(args[2])); //提交job job.waitForCompletion(true); }catch(Exceptione){ e.printStackTrace(); } }}针对这个操作我们需要去修改代码,在这里我们再重新复制一个类,基于WordCountJobSkew复制,新的类名是WordCountJobSkewRandKeypackagecom.imooc.mr;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.io.IOException;importjava.util.Random;/** *数据倾斜-把倾斜的数据打散 * *Createdbyxuwei */publicclassWordCountJobSkewRandKey{ /** *Map阶段 */ publicstaticclassMyMapperextendsMapper<LongWritable,Text,Text,LongWritable>{ Loggerlogger=LoggerFactory.getLogger(MyMapper.class); Randomrandom=newRandom(); /** *需要实现map函数 *这个map函数就是可以接收<k1,v1>,产生<k2,v2> *@paramk1 *@paramv1 *@paramcontext *@throwsIOException *@throwsInterruptedException */ @Override protectedvoidmap(LongWritablek1,Textv1,Contextcontext) throwsIOException,InterruptedException{ //输出k1,v1的值 //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //k1代表的是每一行数据的行首偏移量,v1代表的是每一行内容 //对获取到的每一行数据进行切割,把单词切割出来 String[]words=v1.toString().split(""); //把单词封装成<k2,v2>的形式 Stringkey=words[0]; if("5".equals(key)){ //把倾斜的key打散,分成10份 key="5"+"_"+random.nextInt(10); } Textk2=newText(key); LongWritablev2=newLongWritable(1L); //把<k2,v2>写出去 context.write(k2,v2); } } /** *Reduce阶段 */ publicstaticclassMyReducerextendsReducer<Text,LongWritable,Text,LongWritable>{ Loggerlogger=LoggerFactory.getLogger(MyReducer.class); /** *针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去 *@paramk2 *@paramv2s *@paramcontext *@throwsIOException *@throwsInterruptedException */ @Override protectedvoidreduce(Textk2,Iterable<LongWritable>v2s,Contextcontext) throwsIOException,InterruptedException{ //创建一个sum变量,保存v2s的和 longsum=0L; //对v2s中的数据进行累加求和 for(LongWritablev2:v2s){ //输出k2,v2的值 //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); sum+=v2.get(); //模拟Reduce的复杂计算消耗的时间 if(sum%200==0){ Thread.sleep(1); } } //组装k3,v3 Textk3=k2; LongWritablev3=newLongWritable(sum); //输出k3,v3的值 //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); //把结果写出去 context.write(k3,v3); } } /** *组装Job=Map+Reduce */ publicstaticvoidmain(String[]args){ try{ if(args.length!=3){ //如果传递的参数不够,程序直接退出 System.exit(100); } //指定Job需要的配置参数 Configurationconf=newConfiguration(); //创建一个Job Jobjob=Job.getInstance(conf); //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的 job.setJarByClass(WordCountJobSkewRandKey.class); //指定输入路径(可以是文件,也可以是目录) FileInputFormat.setInputPaths(job,newPath(args[0])); //指定输出路径(只能指定一个不存在的目录) FileOutputFormat.setOutputPath(job,newPath(args[1])); //指定map相关的代码 job.setMapperClass(MyMapper.class); //指定k2的类型 job.setMapOutputKeyClass(Text.class); //指定v2的类型 job.setMapOutputValueClass(LongWritable.class); //指定reduce相关的代码 job.setReducerClass(MyReducer.class); //指定k3的类型 job.setOutputKeyClass(Text.class); //指定v3的类型 job.setOutputValueClass(LongWritable.class); //设置reduce任务个数 job.setNumReduceTasks(Integer.parseInt(args[2])); //提交job job.waitForCompletion(true); }catch(Exceptione){ e.printStackTrace(); } }}调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr,5)scala代码如下:packagecom.imooc.scalaimportorg.apache.spark.{SparkConf,SparkContext}/** *需求:使用集合创建RDD *Createdbyxuwei */objectCreateRddByArrayScala{ defmain(args:Array[String]):Unit={ //创建SparkContext valconf=newSparkConf() conf.setAppName("CreateRddByArrayScala")//设置任务名称 .setMaster("local")//local表示在本地执行 valsc=newSparkContext(conf) //创建集合 valarr=Array(1,2,3,4,5) //基于集合创建RDD valrdd=sc.parallelize(arr) valsum=rdd.reduce(_+_) println(sum) //停止SparkContext sc.stop() }}