¿Cómo procesa Hadoop los registros divididos entre los límites de los bloques?

Resuelto Praveen Sripati asked hace 11 años • 6 respuestas

De acuerdo con laHadoop - The Definitive Guide

Los registros lógicos que define FileInputFormats generalmente no encajan perfectamente en los bloques HDFS. Por ejemplo, los registros lógicos de TextInputFormat son líneas, que cruzarán los límites de HDFS la mayoría de las veces. Esto no tiene relación con el funcionamiento de su programa (por ejemplo, las líneas no se pierden ni se interrumpen), pero vale la pena saberlo, ya que significa que los mapas de datos locales (es decir, mapas que se ejecutan en el mismo host que sus datos de entrada) realizará algunas lecturas remotas. Los ligeros gastos generales que esto provoca normalmente no son significativos.

Supongamos que una línea de registro se divide en dos bloques (b1 y b2). El asignador que procesa el primer bloque (b1) notará que la última línea no tiene un separador EOL y recupera el resto de la línea del siguiente bloque de datos (b2).

¿Cómo determina el asignador que procesa el segundo bloque (b2) que el primer registro está incompleto y debe procesarse a partir del segundo registro del bloque (b2)?

Praveen Sripati avatar Jan 12 '13 14:01 Praveen Sripati
Aceptado

Pregunta interesante, pasé algún tiempo mirando el código para obtener más detalles y aquí están mis pensamientos. Las divisiones son manejadas por el cliente InputFormat.getSplits, por lo que un vistazo a FileInputFormat proporciona la siguiente información:

  • Para cada archivo de entrada, obtenga la longitud del archivo, el tamaño del bloque y calcule el tamaño de división max(minSize, min(maxSize, blockSize))donde maxSizecorresponde mapred.max.split.sizey minSizees mapred.min.split.size.
  • Divida el archivo en diferentes FileSplitarchivos según el tamaño de división calculado anteriormente. Lo importante aquí es que cada uno FileSplitse inicializa con un startparámetro correspondiente al desplazamiento en el archivo de entrada . Aún no hay manejo de las líneas en ese momento. La parte relevante del código se ve así:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

Después de eso, si observa cuál LineRecordReaderestá definido por TextInputFormat, ahí es donde se manejan las líneas:

  • Cuando inicializas tu, LineRecordReaderintenta crear una instancia de una LineReaderque es una abstracción para poder leer líneas FSDataInputStream. Hay 2 casos:
  • Si hay uno CompressionCodecdefinido, entonces este códec es responsable de manejar los límites. Probablemente no sea relevante para su pregunta.
  • Sin embargo, si no hay ningún códec, ahí es donde las cosas son interesantes: si el startde tu InputSplites diferente de 0, entonces retrocedes 1 carácter y luego te saltas la primera línea que encuentras identificada por \n o \r\n (Windows) . El retroceso es importante porque en caso de que los límites de su línea sean los mismos que los límites divididos, esto garantiza que no se salte la línea válida. Aquí está el código relevante:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

Entonces, dado que las divisiones se calculan en el cliente, los mapeadores no necesitan ejecutarse en secuencia, cada mapeador ya sabe si necesita descartar la primera línea o no.

Básicamente, si tiene 2 líneas de 100 Mb cada una en el mismo archivo, y para simplificar, digamos que el tamaño de división es 64 Mb. Luego, cuando se calculen las divisiones de entrada, tendremos el siguiente escenario:

  • División 1 que contiene la ruta y los hosts de este bloque. Inicializado al inicio 200-200=0Mb, longitud 64Mb.
  • División 2 inicializada al inicio 200-200+64=64Mb, longitud 64Mb.
  • División 3 inicializada al inicio 200-200+128=128Mb, longitud 64Mb.
  • División 4 inicializada al inicio 200-200+192=192Mb, longitud 8Mb.
  • El asignador A procesará la división 1, el inicio es 0, así que no se salte la primera línea y lea una línea completa que supere el límite de 64 Mb, por lo que necesita lectura remota.
  • El mapeador B procesará la división 2, el inicio es != 0, así que omita la primera línea después de 64 Mb-1 byte, que corresponde al final de la línea 1 en 100 Mb que todavía está en la división 2, tenemos 28 Mb de la línea en la división 2, entonces Lectura remota de los 72Mb restantes.
  • Mapper C procesará la división 3, el inicio es! = 0, así que omita la primera línea después de 128 Mb-1 byte, que corresponde al final de la línea 2 en 200 Mb, que es el final del archivo, así que no haga nada.
  • El asignador D es igual que el asignador C excepto que busca una nueva línea después de 192Mb-1byte.
Charles Menguy avatar Jan 26 '2013 18:01 Charles Menguy

El algoritmo Map Reduce no funciona en bloques físicos del archivo. Funciona en divisiones de entrada lógica. La división de entrada depende de dónde se escribió el registro. Un registro puede abarcar dos Mappers.

HDFS : Tt divide archivos muy grandes en bloques grandes (por ejemplo, que miden 128 MB) y almacena tres copias de estos bloques en diferentes nodos del clúster.

HDFS no tiene conocimiento del contenido de estos archivos. Es posible que se haya iniciado un registro en el Bloque-a , pero el final de ese registro puede estar presente en el Bloque-b .

Para resolver este problema, Hadoop utiliza una representación lógica de los datos almacenados en bloques de archivos, conocida como divisiones de entrada.

Cuando un cliente de trabajo MapReduce calcula las divisiones de entrada , calcula dónde comienza el primer registro completo de un bloque y dónde termina el último registro del bloque .

El punto clave :

En los casos en que el último registro de un bloque esté incompleto, la división de entrada incluye información de ubicación para el siguiente bloque y el desplazamiento de bytes de los datos necesarios para completar el registro.

Se pueden leer más detalles en la documentación de hadoop.apache.org

El marco Map-Reduce se basa en el InputFormat del trabajo para:

  1. Valide la especificación de entrada del trabajo.
  2. Divida los archivos de entrada en InputSplits lógicos, cada uno de los cuales luego se asigna a un asignador individual.
  3. Luego, cada InputSplit se asigna a un asignador individual para su procesamiento. La división podría ser tupla <input-file-path, start, offset> . InputSplit[] getSplits(JobConf job,int numSplits) es la API que se encarga de estas cosas.

FileInputFormat , que extiende el método InputFormatimplementado getSplits(). Eche un vistazo a los aspectos internos de este método en grepcode

Ravindra babu avatar Jan 12 '2016 06:01 Ravindra babu