资料
  • 资料
  • 专题
[完结13章]一课掌握Java并发编程精髓
推荐星级:
类别: 软件/EDA/IP
时间:2023-12-22
大小:3.93KB
阅读数:138
上传用户:开心就很好了
查看他发布的资源
下载次数
1
所需E币
0
ebi
新用户注册即送 300 E币
更多E币赚取方法,请查看
close
资料介绍
Java并发编程从入门到进阶 多场景实战,众所周知,并发编程是优秀工程师的标准之一,但知识庞杂,复杂性高,常常让人望而却步。但如果没有掌握背后的核心原理,你开发的代码可能会成为难以调试和优化的头疼问题。在此,我将通过上百个案例场景驱动教学+动画直观演示,帮助大家深入、直观地理解并发编程核心概念和底层原理。助力大家在实际工作和面试中都能尽早脱颖而出。

首先,我们先来了解关于并发的基本概念。
并发情况主要会引出三个基本概念,分别是原子性、可见性、有序性三个基本概念

Java中线程的状态分为6种:
1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
2. 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。
线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。
3. 阻塞(BLOCKED):表示线程阻塞于锁。
4. 等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
5. 超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。
6. 终止(TERMINATED):表示该线程已经执行完毕。

其实我们可以通过job.setPartitionerClass来设置分区类,不过目前我们是没有设置的,那框架中是不是有默认值啊,是有的,我们可以通过job.getPartitionerClass方法看到默认情况下会使用HashPartitioner这个分区类
那我们来看一下HashPartitioner的实现是什么样子的
/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}
下面我们来具体跑一个这份数据,首先复制一份WordCountJob的代码,新的类名为WordCountJobSkew
package com.imooc.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * 数据倾斜-增加Reduce任务个数
 *
 * Created by xuwei
 */
public class WordCountJobSkew {
    /**
     * Map阶段
     */
    public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
        Logger logger = LoggerFactory.getLogger(MyMapper.class);
        /**
         * 需要实现map函数
         * 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
         * @param k1
         * @param v1
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable k1, Text v1, Context context)
                throws IOException, InterruptedException {
            //输出k1,v1的值
            //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
            //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
            //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
            //对获取到的每一行数据进行切割,把单词切割出来
            String[] words = v1.toString().split(" ");
            //把单词封装成<k2,v2>的形式
            Text k2 = new Text(words[0]);
            LongWritable v2 = new LongWritable(1L);
            //把<k2,v2>写出去
            context.write(k2,v2);
        }
    }


    /**
     * Reduce阶段
     */
    public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        Logger logger = LoggerFactory.getLogger(MyReducer.class);
        /**
         * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
         * @param k2
         * @param v2s
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
                throws IOException, InterruptedException {
            //创建一个sum变量,保存v2s的和
            long sum = 0L;
            //对v2s中的数据进行累加求和
            for(LongWritable v2: v2s){
                //输出k2,v2的值
                //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                sum += v2.get();
//模拟Reduce的复杂计算消耗的时间
                if(sum % 200 ==0){
                    Thread.sleep(1);
                }

            }

            //组装k3,v3
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            //输出k3,v3的值
            //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
            //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
            // 把结果写出去
            context.write(k3,v3);
        }
    }

    /**
     * 组装Job=Map+Reduce
     */
    public static void main(String[] args) {
        try{
            if(args.length!=3){
                //如果传递的参数不够,程序直接退出
                System.exit(100);
            }

            //指定Job需要的配置参数
            Configuration conf = new Configuration();
            //创建一个Job
            Job job = Job.getInstance(conf);

            //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
            job.setJarByClass(WordCountJobSkew.class);

            //指定输入路径(可以是文件,也可以是目录)
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            //指定输出路径(只能指定一个不存在的目录)
            FileOutputFormat.setOutputPath(job,new Path(args[1]));

            //指定map相关的代码
            job.setMapperClass(MyMapper.class);
            //指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            //指定v2的类型
            job.setMapOutputValueClass(LongWritable.class);



            //指定reduce相关的代码
            job.setReducerClass(MyReducer.class);
            //指定k3的类型
            job.setOutputKeyClass(Text.class);
            //指定v3的类型
            job.setOutputValueClass(LongWritable.class);
            //设置reduce任务个数
            job.setNumReduceTasks(Integer.parseInt(args[2]));

            //提交job
            job.waitForCompletion(true);
        }catch(Exception e){
            e.printStackTrace();
        }

    }


}
针对这个操作我们需要去修改代码,在这里我们再重新复制一个类,基于WordCountJobSkew复制,新的类名是WordCountJobSkewRandKey
package com.imooc.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Random;

/**
 * 数据倾斜-把倾斜的数据打散
 *
 * Created by xuwei
 */
public class WordCountJobSkewRandKey {
    /**
     * Map阶段
     */
    public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
        Logger logger = LoggerFactory.getLogger(MyMapper.class);
        Random random = new Random();
        /**
         * 需要实现map函数
         * 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
         * @param k1
         * @param v1
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable k1, Text v1, Context context)
                throws IOException, InterruptedException {
            //输出k1,v1的值
            //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
            //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
            //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
            //对获取到的每一行数据进行切割,把单词切割出来
            String[] words = v1.toString().split(" ");
            //把单词封装成<k2,v2>的形式
            String key = words[0];
            if("5".equals(key)){
                //把倾斜的key打散,分成10份
                key = "5"+"_"+random.nextInt(10);
            }
            Text k2 = new Text(key);
            LongWritable v2 = new LongWritable(1L);
            //把<k2,v2>写出去
            context.write(k2,v2);
        }
    }


    /**
     * Reduce阶段
     */
    public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        Logger logger = LoggerFactory.getLogger(MyReducer.class);
        /**
         * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
         * @param k2
         * @param v2s
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
                throws IOException, InterruptedException {
            //创建一个sum变量,保存v2s的和
            long sum = 0L;
            //对v2s中的数据进行累加求和
            for(LongWritable v2: v2s){
                //输出k2,v2的值
                //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                sum += v2.get();
                //模拟Reduce的复杂计算消耗的时间
                if(sum % 200 ==0){
                    Thread.sleep(1);
                }

            }

            //组装k3,v3
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            //输出k3,v3的值
            //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
            //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
            // 把结果写出去
            context.write(k3,v3);
        }
    }

    /**
     * 组装Job=Map+Reduce
     */
    public static void main(String[] args) {
        try{
            if(args.length!=3){
                //如果传递的参数不够,程序直接退出
                System.exit(100);
            }

            //指定Job需要的配置参数
            Configuration conf = new Configuration();
            //创建一个Job
            Job job = Job.getInstance(conf);

            //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
            job.setJarByClass(WordCountJobSkewRandKey.class);

            //指定输入路径(可以是文件,也可以是目录)
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            //指定输出路径(只能指定一个不存在的目录)
            FileOutputFormat.setOutputPath(job,new Path(args[1]));

            //指定map相关的代码
            job.setMapperClass(MyMapper.class);
            //指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            //指定v2的类型
            job.setMapOutputValueClass(LongWritable.class);



            //指定reduce相关的代码
            job.setReducerClass(MyReducer.class);
            //指定k3的类型
            job.setOutputKeyClass(Text.class);
            //指定v3的类型
            job.setOutputValueClass(LongWritable.class);
            //设置reduce任务个数
            job.setNumReduceTasks(Integer.parseInt(args[2]));

            //提交job
            job.waitForCompletion(true);
        }catch(Exception e){
            e.printStackTrace();
        }

    }


}
调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。
Spark会为每一个partition运行一个task来进行处理。
Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)
scala代码如下:
package com.imooc.scala

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:使用集合创建RDD
 * Created by xuwei
 */
object CreateRddByArrayScala {

  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByArrayScala ")//设置任务名称
    .setMaster("local")//local表示在本地执行
    val sc = new SparkContext(conf)

    //创建集合
    val arr = Array(1,2,3,4,5)
    //基于集合创建RDD
    val rdd = sc.parallelize(arr)
    val sum = rdd.reduce(_ + _)
    println(sum)

    //停止SparkContext
    sc.stop()

  }

}

版权说明:本资料由用户提供并上传,仅用于学习交流;若内容存在侵权,请进行举报,或 联系我们 删除。
PARTNER CONTENT
相关评论 (下载后评价送E币 我要评论)
没有更多评论了
  • 可能感兴趣
  • 关注本资料的网友还下载了
  • 技术白皮书